最近在写安全方面的程序,有需求,就做了这些TCP加密数据传输类。
utils.safeUtils的内容详见:

  • SafeObj:Python 高安全性加密数据容器类-CSDN博客
  • SafeKey:Python 高安全性加密密码容器类-CSDN博客

如有任何问题或漏洞欢迎批评指正!不胜感激!

# 代码

1. 服务端

import sys
import tracebackimport os
import socket
from threading import Thread
from typing import Callable, Any
import secrets
import struct
from datetime import datetime as dt,timedelta as tdel
from utils.safeUtils import SafeKey,combine_keys,SafeObj,secure_eraseclass Server:def __init__(self, *,sendCallback: Callable[[str, int, bytes], bytes],host: str = "localhost",port: int = 0,errCallback: Callable[[Exception], Any] = lambda err: print(traceback.format_exc())):self.srv = socket.socket()self._host = hostself._port = portself._sendCallback = sendCallbackself._errCallback = errCallbackself._close = Falseself.srv.bind((host, port))self.srv.listen(16)self.srv.settimeout(1)self._mainThread = Thread(None, self._main, "TCP-Server_Main", daemon=True)self._mainThread.start()@propertydef host(self):return self._host@propertydef port(self):return self._portdef _main(self):while not self._close:try:conn, addr = self.srv.accept()except socket.timeout:continuetry:data = b""while block := conn.recv(1024):data += blockcontent = self._sendCallback(addr[0],addr[1],data)conn.send(content)except socket.error as err:self._errCallback(err)finally:conn.close()def close(self):self.srv.close()self._close = Trueself._mainThread.join()class CryptServer(Server):def __init__(self,*args,**kwargs):self.nonce = []super().__init__(*args,**kwargs)print(self.srv.getsockname())def _main(self):def ATTACK_ALERT(o = None,ts = None,n = None,c = None,r = None,s = True,t = "REPLAY or MAN-IN-THE-MIDDLE ATTACK"):L, R = CF.LIGHTWHITE_EX + CS.BRIGHT, CS.RESET_ALLerr = f'''{CB.BLUE}{L}!!! [SECURITY ALERT] !!!: {t}{R}\n{CB.RED}{L}{"▰" *22} ATTACK {"▰" * 22}{R}\n{"TIMESTAMP(Now/Response): " if s else "ATTACK Count: "}{("/".join(x.strftime("%Y-%m-%dT%H:%M:%S") for x in (o, ts))) if s else str(c)}\n{"NONCE: " if s else "RAM Tamper: "}{n.hex() if s else str(r)}\n{CB.RED}{L}{"▰" * 20} SECURITY ALERT {"▰" * 20}{R}'''.replace(" " * 4, "")os.popen("msg * SECURITY ALERT: ************ has been ATTACKED");raise RuntimeError(err)last_nonce = len(self.nonce)while not self._close:try:conn, addr = self.srv.accept()print(addr)except socket.timeout:continuekey, safeData, cliKey, mainKey, content, data = (None for _ in range(6)) # 初始化(finally要用)try:key = SafeKey(secrets.token_bytes(128))                       # 每次会话的唯一公钥key.use_password(conn.sendall,t = bytes)                                   # 向客户端发送公钥cliKey = conn.recv(128)                                       # 读取客户端私钥ts = conn.recv(20).decode()                                   # [安全校验] 读取时间戳(%Y%m%d%H%M%S%f)ts = dt.strptime(ts,"%Y%m%d%H%M%S%f")                 # [安全校验] 格式化时间戳n = conn.recv(12)                                             # [安全校验] 读取会话唯一 nonce# [安全校验] 检测是否可能为重放if abs((o:=dt.now())-ts).total_seconds()>1or n in self.nonce:ATTACK_ALERT(o = o,ts = ts,n = n)else:self.nonce.append(n)mainKey = combine_keys(key.get_password(bytes),cliKey)            # 使用公钥和私钥派生主密钥secure_erase(cliKey)                                              # 擦除客户端私钥del key,cliKeylength = struct.unpack("!I",conn.recv(4))[0]              # 读取数据长度data = conn.recv(length)safeData = SafeObj.from_encrypted_package(data,mainKey)           # 获取SafeObj对象content = self._sendCallback(addr[0],addr[1],safeData.get_data()) # 从处理函数获取数据encrypted_package = SafeObj(content,mainKey).get_encrypted_package()length = len(encrypted_package)conn.sendall(struct.pack("!I",length))conn.sendall(encrypted_package)                                                 # 发送加密数据secure_erase(bytearray(encrypted_package))                                   # 安全擦除所有中间数据secure_erase(bytearray(data))secure_erase(bytearray(content))print(mainKey.get_password(bytes),"\n",length,"\n",data,"\n",encrypted_package)except Exception as err:self._errCallback(err)finally:conn.close()try:del keyexcept NameError:passtry:del cliKeyexcept NameError:passdel safeData, mainKey, content, datad = len(self.nonce) - last_nonceif d < 0 or d > 10:ATTACK_ALERT(c = d,r = d < 0,t = "EXPLOSIVE ATTACK or RAM TAMPERING")last_nonce = len(self.nonce)conn.close()print("close:",addr)if __name__ == '__main__':def send(host,port,content):return secrets.token_bytes(256)srv = CryptServer(sendCallback = send)srv._mainThread.join()

2. 客户端

import socket
import sys
import timeimport secrets
from datetime import datetime
import struct
from utils.safeUtils import SafeKey, combine_keys, SafeObj, secure_eraseclass Client:def __init__(self):self.cli = socket.socket()self.cli.settimeout(10)def connect(self,host:str,port:int):self.cli.connect((host,port))def request(self,data:bytes) -> bytes:self.cli.send(data)return self.cli.recv(1024)def close(self):self.cli.close()class CryptClient(Client):def __init__(self):super().__init__()def request(self, data: bytes) -> bytes:"""安全加密请求流程:1. 接收服务端公钥2. 生成并发送客户端私钥3. 发送时间戳和随机nonce4. 派生主密钥5. 加密并发送请求数据6. 接收并解密响应参数:data: 要发送的原始数据(字节类型)返回:bytes: 解密后的响应数据"""# 生成客户端私钥(128字节安全随机令牌)cli_key = secrets.token_bytes(128)# 1. 接收服务端公钥(128字节)srv_key = self.cli.recv(128)if len(srv_key) != 128:raise ConnectionError("无效的公钥长度")# 2. 发送客户端私钥self.cli.send(cli_key)# 3. 生成并发送安全参数# 时间戳(精确到微秒,20字节字符串)timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")[:20]# 随机nonce(12字节)nonce = secrets.token_bytes(12)self.cli.send(timestamp.encode('utf-8'))  # 发送时间戳self.cli.send(nonce)  # 发送随机noncetry:# 4. 派生主密钥(组合服务端公钥和客户端私钥)main_key = combine_keys(srv_key, cli_key)# 5. 加密请求数据# 创建安全对象加密数据safe_request = SafeObj(data, main_key)# 获取完整加密包(包含盐/nonce/标签/加密数据)encrypted_package = safe_request.get_encrypted_package()# 发送长度length = len(encrypted_package)self.cli.sendall(struct.pack("!I",length))# 发送加密数据包self.cli.sendall(encrypted_package)# 6. 接收服务端响应# 接收数据长度length = struct.unpack("!I",self.cli.recv(4))[0]response = self.cli.recv(length)# 解密响应数据safe_response = SafeObj.from_encrypted_package(response, main_key)return safe_response.get_data()finally:# 安全清除所有敏感数据secure_erase(srv_key)secure_erase(cli_key)secure_erase(nonce)if 'main_key' in locals():del main_keyif 'safe_request' in locals():del safe_requestif 'safe_response' in locals():del safe_responseif __name__ == '__main__':data = b"hello world!" * 100cli = CryptClient()cli.connect("localhost", YOU_SERVER_PORT)print(cli.request(data))cli.close()

3. utils.safeUtils

import ctypes
import mmap
import os
import pickle
import random
import secrets
import sys
import hashlib
import hmac
import time
from typing import Any, Optional, Callable, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.hkdf import HKDFfrom utils.Infoget import infogetclass PasswordError(Exception): passclass SafeKey:passclass CryptConfig:erase_count = 7PBKDF2HMAC_iterations = 1000  # 建议1000-100000def secure_erase(buffer) -> None:"""安全清除内存内容(支持多种类型)"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆盖内存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止编译器优化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 处理内存映射对象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)elif isinstance(buffer, str):# 字符串清除 - 创建可变副本mutable = bytearray(buffer.encode('utf-8'))for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0except:passdef combine_keys(key1: bytes, key2: bytes) -> SafeKey:"""安全混合两个字节密钥派生主密钥并返回SafeKey对象参数:key1: 第一个输入密钥 (字节类型)key2: 第二个输入密钥 (字节类型)返回:SafeKey: 包含派生主密钥的安全对象步骤:1. 验证输入密钥有效性2. 使用HKDF混合派生密钥3. 创建SafeKey对象4. 安全清除中间密钥"""# 1. 验证输入密钥if not key1 or len(key1) == 0:raise ValueError("key1 不能为空")if not key2 or len(key2) == 0:raise ValueError("key2 不能为空")try:# 2. 创建可变密钥副本用于安全处理mutable_key1 = bytearray(key1)mutable_key2 = bytearray(key2)# 3. 拼接密钥作为HKDF输入combined_key = mutable_key1 + mutable_key2# 4. 使用HKDF派生主密钥hkdf = HKDF(algorithm=hashes.SHA3_512(),length=32,  # 输出32字节密钥(AES-256兼容)salt=None,  # 不使用盐(输入密钥已足够随机)info=b"SafeKey-Derivation",  # 上下文信息backend=default_backend())# 派生密钥derived_key = hkdf.derive(combined_key)# 5. 创建SafeKey对象safekey = SafeKey(derived_key)return safekeyfinally:# 6. 安全清除所有中间密钥(确保即使异常也执行)if 'mutable_key1' in locals():secure_erase(mutable_key1)  # 使用SafeKey中的安全擦除方法if 'mutable_key2' in locals():secure_erase(mutable_key2)if 'combined_key' in locals():secure_erase(combined_key)if 'derived_key' in locals():secure_erase(derived_key)class SafeKey:def __init__(self, password: Union[str,bytes,bytearray]):"""加固版绝对安全的密钥存储 - 单次使用"""# 1. 将密码转换为可变字节数组if isinstance(password,str):password_bytes = bytearray(password.encode('utf-8'))elif isinstance(password,bytes):password_bytes = bytearray(password)elif isinstance(password,bytearray):password_bytes = passwordelse:password_type = type(password)del passwordraise TypeError(f"unsupported password type: '{password_type}'")# 2. 生成随机内部主密钥 (32字节)self._internal_master_key = secrets.token_bytes(32)# 3. 生成随机noncenonce = secrets.token_bytes(12)# 4. 使用AES-GCM加密密码cipher = Cipher(algorithms.AES(self._internal_master_key),modes.GCM(nonce),backend=default_backend())encryptor = cipher.encryptor()encrypted = encryptor.update(password_bytes) + encryptor.finalize()# 5. 创建加密数据包并添加HMAC校验encrypted_package = nonce + encryptor.tag + encryptedhmac_digest = hmac.new(self._internal_master_key,encrypted_package,hashlib.sha3_256).digest()self._encrypted_data = hmac_digest + encrypted_package# 6. 安全清除原始密码secure_erase(password_bytes)del password_bytesdel password  # 删除引用# 7. 锁定内部主密钥和加密数据在内存中self._locked_master_key = self._lock_in_memory(self._internal_master_key)self._locked_encrypted_data = self._lock_in_memory(self._encrypted_data)# 8. 清除原始密钥引用secure_erase(self._internal_master_key)self._internal_master_key = Nonesecure_erase(self._encrypted_data)self._encrypted_data = None# 9. 标记对象状态self._is_active = Trueself._password_ref = None# 10. 反调试保护self._check_security_environment()def _lock_in_memory(self, data: bytes) -> mmap.mmap:"""将数据安全锁定在内存中,防止交换到磁盘"""size = len(data)# 创建共享内存区域if sys.platform == 'win32':shm = mmap.mmap(-1, size, access=mmap.ACCESS_WRITE)else:# 类Unix系统内存保护PROT_READ = 1PROT_WRITE = 2PROT_READWRITE = PROT_READ | PROT_WRITEshm = mmap.mmap(-1, size, prot=PROT_READWRITE)# 复制数据到锁定内存shm.write(data)# 锁定内存防止交换if not self._mlock_memory(shm, size):# 锁定失败时立即清除并终止secure_erase(shm)shm.close()self._terminate_with_error("内存锁定失败")# 安全清除原始数据secure_erase(data)return shmdef _mlock_memory(self, shm: mmap.mmap, size: int) -> bool:"""锁定内存防止交换到磁盘(跨平台实现)"""try:if sys.platform == 'win32':# Windows内存锁定return ctypes.windll.kernel32.VirtualLock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) != 0else:# Unix系统内存锁定libc = ctypes.CDLL(None)return libc.mlock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) == 0except:return Falsedef _check_security_environment(self):"""检查安全环境,防御调试和内存转储"""# 1. 检测调试器if self._is_debugger_present():self._terminate_with_error("检测到调试器 - 安全终止")# 2. 检测虚拟机(可选)if self._is_running_in_vm():self._terminate_with_error("虚拟机环境不安全 - 安全终止")def _is_debugger_present(self) -> bool:"""检测调试器存在"""try:# Windows调试器检测if sys.platform == 'win32':kernel32 = ctypes.windll.kernel32return kernel32.IsDebuggerPresent() != 0# Linux调试器检测elif sys.platform.startswith('linux'):try:with open('/proc/self/status', 'r') as status_file:for line in status_file:if line.startswith('TracerPid:'):tracer_pid = int(line.split(':')[1].strip())return tracer_pid != 0except:pass# 检查LD_PRELOAD劫持if 'LD_PRELOAD' in os.environ:return Trueexcept:passreturn Falsedef _is_running_in_vm(self) -> bool:"""检测是否在虚拟机中运行"""try:# 简单虚拟机检测if sys.platform == 'win32':import winregwith winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,r"SYSTEM\CurrentControlSet\Control\SystemInformation") as key:system_manufacturer = winreg.QueryValueEx(key, "SystemManufacturer")[0]system_product = winreg.QueryValueEx(key, "SystemProductName")[0]vm_indicators = ["VMware", "Virtual", "Xen", "QEMU", "KVM"]return any(indicator in system_manufacturer or indicator in system_product for indicator in vm_indicators)elif sys.platform.startswith('linux'):# 检查DMI信息dmi_files = ['/sys/class/dmi/id/product_name','/sys/class/dmi/id/sys_vendor']for dmi_file in dmi_files:if os.path.exists(dmi_file):with open(dmi_file, 'r') as f:content = f.read().lower()if 'vmware' in content or 'virtual' in content or 'qemu' in content:return Trueexcept:passreturn Falsedef _terminate_with_error(self, message: str):"""安全终止程序并显示错误消息"""infoget(f"安全错误: {message}",type = "err")# 尝试安全清除内存if hasattr(self, '_locked_master_key'):secure_erase(self._locked_master_key)if hasattr(self, '_locked_encrypted_data'):secure_erase(self._locked_encrypted_data)# 立即终止程序os._exit(1)def __enter__(self):"""进入上下文时获取密码访问对象"""if not self._is_active:raise RuntimeError("SafeKey 已销毁")# 返回自身而不是直接解密密码return selfdef get_password(self,t:type = str) -> Union[str,bytes]:"""安全获取密码(使用后立即清除)"""if not self._is_active:raise RuntimeError("SafeKey 已销毁")# 1. 从锁定内存中获取加密数据self._locked_encrypted_data.seek(0)full_package = self._locked_encrypted_data.read()# 验证数据包长度if len(full_package) < 32 + 12 + 16:  # HMAC(32) + nonce(12) + tag(16)self._terminate_with_error("加密数据包损坏")# 2. 分离HMAC和加密数据hmac_digest = full_package[:32]encrypted_package = full_package[32:]# 3. 从锁定内存中获取主密钥self._locked_master_key.seek(0)master_key = self._locked_master_key.read()# 4. 验证HMACexpected_hmac = hmac.new(master_key,encrypted_package,hashlib.sha3_256).digest()if not hmac.compare_digest(hmac_digest, expected_hmac):self._terminate_with_error("加密数据完整性校验失败")# 5. 解包加密数据nonce = encrypted_package[:12]tag = encrypted_package[12:28]ciphertext = encrypted_package[28:]# 6. 使用AES-GCM解密cipher = Cipher(algorithms.AES(master_key),modes.GCM(nonce, tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(ciphertext) + decryptor.finalize()# 7. 创建可变密码副本password = decrypted.decode("utf-8") if t == str else decrypted# 8. 安全清除临时数据secure_erase(full_package)secure_erase(master_key)secure_erase(decrypted)return passworddef use_password(self, callback: callable,t = str) -> None:"""安全使用密码(自动清除)"""password = self.get_password(t)try:callback(password)finally:# 确保密码被清除if 'password' in locals():secure_erase(bytearray(password))def __exit__(self, exc_type, exc_value, traceback):"""退出上下文时销毁所有敏感数据"""# 安全清除并销毁密码引用if self._password_ref:secure_erase(self._password_ref)try:self._password_ref.close()except:passself._password_ref = None# 安全清除并销毁主密钥if self._locked_master_key:secure_erase(self._locked_master_key)try:self._locked_master_key.close()except:passself._locked_master_key = None# 安全清除并销毁加密数据if self._locked_encrypted_data:secure_erase(self._locked_encrypted_data)try:self._locked_encrypted_data.close()except:passself._locked_encrypted_data = None# 标记对象为已销毁self._is_active = Falsedef __del__(self):"""析构时确保内存安全清除"""if hasattr(self, '_is_active') and self._is_active:self.__exit__(None, None, None)class SafeObj:def __init__(self, data: Any, key: Union[str, 'SafeKey']):"""安全数据保护对象:param data: 需要保护的任意类型数据:param key: 用于加密的密钥(字符串或SafeKey对象)"""# 初始化安全属性self._key = keyself._encrypted_data: bytes = b''self._salt = os.urandom(16)self._nonce = os.urandom(12)self._tag: bytes = b''self._is_active = True# 加密数据self._encrypt(data)def _secure_memzero(self, buffer) -> None:"""安全清除内存内容"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆盖内存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止编译器优化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 处理内存映射对象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)except (ValueError, TypeError, OSError):passdef _get_key_bytes(self) -> bytes:"""从密钥源获取字节形式密钥"""if isinstance(self._key, str):# 字符串密钥直接使用return self._key.encode()elif hasattr(self._key, 'get_password'):# SafeKey对象 - 安全获取密码return self._key.get_password(bytes)else:raise TypeError("不支持的密钥类型")def _derive_key(self, key_length: int = 32) -> bytes:"""派生加密密钥"""# 获取原始密钥字节key_bytes = self._get_key_bytes()# 派生加密密钥kdf = PBKDF2HMAC(algorithm=hashes.SHA3_512(),length=key_length,salt=self._salt,iterations=CryptConfig.PBKDF2HMAC_iterations,backend=default_backend())derived_key = kdf.derive(key_bytes)# 安全清除中间数据self._secure_memzero(key_bytes)return derived_keydef _encrypt(self, data: Any) -> None:"""加密数据并存储在内存中"""# 序列化数据serialized = pickle.dumps(data)try:# 派生加密密钥derived_key = self._derive_key()# 加密数据cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce),backend=default_backend())encryptor = cipher.encryptor()self._encrypted_data = encryptor.update(serialized) + encryptor.finalize()self._tag = encryptor.tagfinally:# 安全清除中间数据self._secure_memzero(serialized)if "derived_key" in locals():self._secure_memzero(derived_key)def get_data(self) -> Any:"""安全获取解密后的原始数据- 返回原始数据类型- 调用者负责安全处理返回的数据"""if not self._is_active:raise RuntimeError("SafeObj 已销毁")# 派生解密密钥derived_key = self._derive_key()try:# 解密数据cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce, self._tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(self._encrypted_data) + decryptor.finalize()# 反序列化数据data = pickle.loads(decrypted)return datafinally:# 安全清除中间数据self._secure_memzero(derived_key)self._secure_memzero(decrypted)def use_data(self, callback: Callable[[Any], None]) -> None:"""安全使用数据(自动清除):param callback: 回调函数,接受解密后的数据作为参数"""data = Nonetry:data = self.get_data()callback(data)finally:# 安全清除数据(如果可能)if data is not None:self._secure_erase_recursive(data)def _secure_erase_recursive(self, obj: Any) -> None:"""递归安全清除数据结构中的敏感内容"""try:if isinstance(obj, (bytes, bytearray)):self._secure_memzero(obj)elif isinstance(obj, str):# 字符串不可变,创建可变副本后清除mutable = bytearray(obj.encode('utf-8'))self._secure_memzero(mutable)elif isinstance(obj, dict):for key, value in obj.items():self._secure_erase_recursive(value)self._secure_erase_recursive(key)elif isinstance(obj, (list, tuple, set)):for item in obj:self._secure_erase_recursive(item)elif hasattr(obj, '__dict__'):# 处理自定义对象for attr in vars(obj):self._secure_erase_recursive(getattr(obj, attr))except:pass@propertydef encrypted_data(self) -> bytes:"""获取加密后的数据(不含盐和nonce)"""return self._encrypted_datadef get_encrypted_package(self) -> bytes:"""获取完整加密包(包含盐、nonce、标签和加密数据)"""return self._salt + self._nonce + self._tag + self._encrypted_data@classmethoddef from_encrypted_package(cls, encrypted_package: bytes, key: Union[str, 'SafeKey']) -> 'SafeObj':"""从加密包创建SafeObj实例"""if len(encrypted_package) < 16 + 12 + 16:raise ValueError("无效的加密包")# 解包数据salt = encrypted_package[:16]nonce = encrypted_package[16:28]tag = encrypted_package[28:44]encrypted_data = encrypted_package[44:]# 创建虚拟实例instance = cls.__new__(cls)instance._key = keyinstance._salt = saltinstance._nonce = nonceinstance._tag = taginstance._encrypted_data = encrypted_datainstance._is_active = Truereturn instancedef __del__(self):"""析构时确保内存安全清除"""if self._is_active:self._is_active = False# 安全清除所有敏感数据self._secure_memzero(self._salt)self._secure_memzero(self._nonce)self._secure_memzero(self._tag)self._secure_memzero(self._encrypted_data)

# 分析

1. 连接步骤流程

初始化阶段:
        服务端启动时生成随机公钥(128字节),绑定端口监听连接。
        客户端连接服务端后,服务端立即发送公钥。
密钥交换阶段:
        客户端:
                生成随机私钥(128字节)并发送给服务端。
                发送精确到微秒的当前时间戳(20字节)和随机 nonce(12字节)。
        服务端:
                校验时间戳(与当前时间差≤1秒)和 nonce(是否重复),防止重放攻击
                组合服务端公钥 + 客户端私钥,派生主密钥(HKDF-SHA3-512)
数据传输阶段:
        客户端:
                用主密钥加密数据(AES-GCM + PBKDF2HMAC-SHA3-512)。
                发送加密数据包(盐 + nonce + 认证标签 + 密文)。
        服务端:
                用主密钥解密数据,处理业务逻辑。
                加密响应数据并返回相同结构的加密包。
连接终止:
        双方安全擦除会话密钥、临时密钥等敏感数据
        关闭连接。


2. 加密方式

密钥派生:
        主密钥派生:
                服务端公钥(128B) + 客户端私钥(128B) → HKDF-SHA3-512 → 32字节主密钥。
        数据加密密钥派生:
                主密钥 + 随机盐(16B) → PBKDF2HMAC-SHA3-512 → 32字节数据密钥。
数据加密:
        算法:
                AES-256-GCM(认证加密)。
        数据包结构:
                16B盐 + 12B nonce + 16B认证标签 + 变长密文。
完整性保护:
        时间戳 + nonce 防重放。
        GCM模式提供数据完整性和来源认证。


3. 安全设计

前向安全性:
        每次会话使用临时密钥(公钥/私钥对),会话结束立即擦除。
抗重放攻击:
        时间戳校验(1秒窗口) + nonce唯一性检测(服务端维护nonce列表)。
内存安全:
        敏感数据擦除:密钥、中间值用随机数据覆盖7次(secure_erase)。
        内存锁定:密钥存储在不可交换的内存区域(mlock)。
环境安全检测:
        反调试:检查调试器附加(IsDebuggerPresent/TracerPid)。
        反Hook:验证关键函数地址完整性。
        反虚拟机:检测VM特征(注册表/DMI信息)。
密钥管理:
        SafeKey对象:密钥全程加密存储,仅在使用时短暂解密。
        单次使用:密钥解密后立即擦除,不可复用。


4. 不足

长连接支持(暂未实现):
        当前设计每次请求新建连接,可扩展为k复用连接的会话模式。

速度(本机测试最大速度仅2Mbps = 256KB/s):

        加密操作大量耗时,CPU密集型操作。

适用性:
        仅支持单次、慢速通信(进程间加密数据传输),不能用于远程通信

可能的漏洞:

        中间人 (MITM) 攻击(当前无身份验证)
        内存耗尽攻击(恶意请求使服务器nonce列表无限增长)
        时间窗口攻击(攻击者可快速重放合法请求)
        内存安全残留风险(Python内部对象清除延时)
        拒绝服务 (DoS) 漏洞(Nonce 洪水、密钥派生消耗、连接耗尽等)
        侧信道攻击(时间、内存、物理监听等)


制作不易,感谢大家的支持!如有任何问题或建议欢迎评论!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/92024.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/92024.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/92024.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows批量修改文件属性方法

标题使用icacls命令&#xff08;推荐批量操作&#xff09;打开管理员权限的命令提示符&#xff08;CMD&#xff09;执行以下命令&#xff1a;cmd icacls "文件夹路径" /grant 用户名:(OI)(CI)F /T /C 参数说明&#xff1a;(OI)&#xff1a;对象继承 - 适用于文件夹(C…

Entity Component System架构

ECS架构 1 简介 在当今快速发展的软件开发领域&#xff0c;游戏开发、实时模拟等场景对系统的性能、灵活性和可扩展性提出了极高的要求。传统的面向对象架构在面对复杂且动态变化的实体时&#xff0c;往往会出现代码耦合度高、扩展性差等问题。​ ECS&#xff08;Entity - Com…

.vscode 扩展配置

一、vue快捷键配置 在项目.vscode下新建vue3.0.code-snippets 每当输入vue3.0后自动生成代码片段 {"Vue3.0快速生成模板": {"scope": "vue","prefix": "Vue3.0","body": ["<template>"," &…

一个基于阿里云的C端Java服务的整体项目架构

1.背景介绍 总结一下工作使用到的基于通常的公有云的项目整体架构&#xff0c;如何基于公有云建设安全可靠的服务&#xff0c;以阿里云为例的整体架构&#xff1b;1. 全局流量治理层&#xff08;用户请求入口&#xff09;1.1 域名与 DNS 解析域名注册与备案&#xff1a;通过阿里…

《剥开洋葱看中间件:Node.js请求处理效率与错误控制的深层逻辑》

在Node.js的运行时环境中&#xff0c;中间件如同一系列精密咬合的齿轮&#xff0c;驱动着请求从进入到响应的完整旅程&#xff0c;而洋葱模型则是这组齿轮的传动系统。它以一种看似矛盾的方式融合了顺序与逆序、分离与协作——让每个处理环节既能独立工作&#xff0c;又能感知全…

GaussDB union 的用法

1 union 的作用union 运算符用于组合两个或更多 select 语句的结果集。2 union 使用前提union 中的每个 select 语句必须具有相同的列数这些列也必须具有相似的数据类型每个 select 语句中的列也必须以相同的顺序排列3 union 语法select column_name(s) from table1 union sele…

构建足球实时比分APP:REST API与WebSocket接入方案详解

在开发足球实时比分应用时&#xff0c;数据接入方式的选择直接影响用户体验和系统性能。本文将客观分析REST API和WebSocket两种主流接入方案的技术特点、适用场景和实现策略&#xff0c;帮助开发者做出合理选择。一、REST API&#xff1a;灵活的数据获取方案核心优势标准化接口…

Linux文件系统三要素:块划分、分区管理与inode结构解析

理解文件系统 我们知道文件可以分为磁盘文件和内存文件&#xff0c;内存文件前面我们已经谈过了&#xff0c;下面我们来谈谈磁盘文件。 目录 一、引入"块"概念 解析 stat demo.c 命令输出 基本信息 设备信息 索引节点信息 权限信息 时间戳 二、引入"分区…

基于paddleDetect的半监督目标检测实战

基于paddleDetect的半监督目标检测实战前言相关介绍前提条件实验环境安装环境项目地址使用paddleDetect的半监督方法训练自己的数据集准备数据分割数据集配置参数文件PaddleDetection-2.7.0/configs/semi_det/denseteacher/denseteacher_ppyoloe_plus_crn_l_coco_semi010.ymlPa…

计算机网络:(十)虚拟专用网 VPN 和网络地址转换 NAT

计算机网络&#xff1a;&#xff08;十&#xff09;虚拟专用网 VPN 和网络地址转换 NAT前言一、虚拟专用网 VPN1. 基础概念与作用2. 工作原理3. 常见类型4. 协议对比二、NAT&#xff1a;网络地址转换1. 基础概念与作用2. 工作原理与类型3. 优缺点与问题4. 进阶类型三、VPN 与 N…

数位 dp

数位dp 特点 问题大多是指“在 [l,r][l,r][l,r] 的区间内&#xff0c;满足……的数字的个数、种类&#xff0c;等等。” 但是显然&#xff0c;出题人想要卡你&#xff0c;rrr 肯定是非常大的&#xff0c;暴力枚举一定超时。 于是就有了数位 dp。 基本思路 数位 dp 说白了…

Selector的用法

Selector的用法 Selector是基于lxml构建的支持XPath选择器、CSS选择器&#xff0c;以及正则表达式&#xff0c;功能全面&#xff0c;解析速度和准确度非常高 from scrapy import Selectorbody <html><head><title>HelloWorld</title></head>&…

Netty封装Websocket并实现动态路由

引言 关于Netty和Websocket的介绍我就不多讲了,网上一搜一大片。现如今AI的趋势发展很热门,长连接对话也是会经常接触到的,使用Websocket实现长连接,那么很多人为了快速开发快速集成就会使用spring-boot-starter-websocket依赖快速实现,但是注意该实现是基于tomcat的,有…

行为型设计模式:解释器模式

解释器模式 解释器模式介绍 解释器模式使用频率不算高&#xff0c;通常用来描述如何构建一个简单“语言”的语法解释器。它只在一些非常特定的领域被用到&#xff0c;比如编译器、规则引擎、正则表达式、SQL 解析等。不过&#xff0c;了解它的实现原理同样很重要&#xff0c;能…

SaTokenException: 未能获取对应StpLogic 问题解决

&#x1f4dd; Sa-Token 异常处&#xff1a;未能获取对应StpLogic&#xff0c;typeuser&#x1f9e8; 异常信息 cn.dev33.satoken.exception.SaTokenException: 未能获取对应StpLogic&#xff0c;typeuser抛出位置&#xff1a; throw new SaTokenException("未能获取对应S…

Web前端性能优化原理与方法

一、概述 1.1 性能对业务的影响 大部分网站的作用是&#xff1a;产品信息载体、用户交互工具或商品流通渠道。这就要求网站与更多用户建立联系&#xff0c;同时还要保持良好的用户黏性&#xff0c;所以网站就不能只关注自我表达&#xff0c;而不顾及用户是否喜欢。看看网站性…

第十八节:第六部分:java高级:注解、自定义注解、元注解

认识注解自定义注解注解的原理元注解常用的两个元注解代码&#xff1a; MyTest1&#xff08;注解类&#xff09; package com.itheima.day10_annotation;import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Retent…

北京科技企业在软文推广发稿平台发布文章,如何精准触达客户?

大家好&#xff01;我是你们的老朋友&#xff0c;今天咱们聊聊北京科技企业如何通过软文推广发稿平台精准触达目标客户这个话题。作为企业营销的老司机&#xff0c;我深知在这个信息爆炸的时代&#xff0c;如何让你的品牌声音被目标客户听到是多么重要。下面就让我来分享一些实…

UE蒙太奇和动画序列有什么区别?

在 UE5 中&#xff0c;Animation Sequence&#xff08;动画序列&#xff09;和 Animation Montage&#xff08;动画蒙太奇&#xff09;虽然都能播放骨骼动画&#xff0c;但它们的定位、功能和使用场景有较大区别&#xff1a;1. 概念定位Animation Sequence&#xff08;动画序列…

Nordic打印RTT[屏蔽打印中的<info> app]

屏蔽打印中的 app Nordic原装的程序答应是这样的,这个有" app"打印,因为习惯问题,有时候也不想打印太多造成RTT VIEW显示被冲点,所以要把" app"去掉:这里把prefix_process函数调用屏蔽到,主要涉及到nrf_log_hexdump_entry_process和nrf_log_std_entry_proc…