-
遇见的问题:
测试用例使用thrift资源和redis资源,单独运行case没有问题,但是使用并发pytest-xdist(-n 10 和 --dist=loadscope)运行失败 -
原因:
测试用例间存在共享资源竞争(如 Redis、Thrift 连接)和测试类状态未隔离 -
解决办法:
原来的测试前置是通过传统的setup来实现初始化,会导致资源共享
def setup(self):self.check = common_check.CommonCheck()self.req = common_req.CommonReq()self.driverId = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.passengerId_one = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.passengerId_two = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.passengerId_three = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.orderId = str(int(''.join(str(random.randint(0, 9)) for _ in range(10))))self.orderId_two = str(int(self.orderId) + 1)self.orderId_three = str(int(self.orderId)+ 2)self.travel_id = int(''.join(str(random.randint(0, 9)) for _ in range(10)))# self.redish = redis.StrictRedis(host=globalVar.g_fusion_ip, port=globalVar.g_fusion_port,password=globalVar.g_fusion_password, username=globalVar.g_fusion_username, db=0)self.redish = redis.StrictRedis(host=globalVar.g_redis_ip, port=globalVar.g_redis_port, db=0)try:transport = TSocket.TSocket(globalVar.crm_ip, globalVar.crm_port)transport.setTimeout(10000)transport = TTransport.TFramedTransport(transport)protocol = TBinaryProtocol.TBinaryProtocol(transport)self.client = Client(protocol)transport.open()self.trans = transportexcept Thrift.TException as tx:print('%s' % (tx.message))except Exception as ex:print('%s' % (ex.message))
现在通过fixture,为每个用例创建独立资源 + 自动清理」,实现了用例间的完全隔离,从根本上避免了并发冲突
@pytest.fixture(autouse=True)def setup_isolated(self):self.test_uuid = str(uuid.uuid4()) # 测试用例唯一IDself.driverId = self._generate_unique_id()self.passengerId_one = self._generate_unique_id()self.passengerId_two = self._generate_unique_id()self.passengerId_three = self._generate_unique_id()self.orderId = str(self._generate_unique_id())self.orderId_two = str(int(self.orderId) + 1)self.orderId_three = str(int(self.orderId) + 2)self.travel_id = self._generate_unique_id()# 2. 初始化工具类(无状态,可安全复用)self.check = common_check.CommonCheck()self.req = common_req.CommonReq()# 3. 初始化 Redis 连接(每个用例独立连接,避免共享)self.redish = redis.StrictRedis(host=globalVar.g_redis_ip,port=globalVar.g_redis_port,db=0,decode_responses=True # 避免 bytes/str 类型混乱)# 4. 初始化 Thrift 客户端(每个用例独立连接,避免共享)self.transport = Noneself.client = Nonetry:self.transport = TSocket.TSocket(globalVar.crm_ip, globalVar.crm_port)self.transport.setTimeout(10000)self.transport = TTransport.TFramedTransport(self.transport)protocol = TBinaryProtocol.TBinaryProtocol(self.transport)self.client = Client(protocol)self.transport.open()except Thrift.TException as tx:pytest.fail("Thrift 连接初始化失败:",tx.message)except Exception as ex:pytest.fail("未知错误: ",ex.message)# 5. 用例执行前的钩子(yield 前为 setup,后为 teardown)yield# 6. 用例结束后清理资源(避免连接泄漏)if self.transport and self.transport.isOpen():self.transport.close()self.redish.close() # 关闭 Redis 连接@contextmanagerdef redis_lock(self, key, timeout=5):"""Redis 分布式锁(解决多进程共享资源竞争)"""lock_key = "lock:{key}"lock_acquired = Falsetry:# 尝试获取锁(NX=不存在才设置,PX=过期时间毫秒)lock_acquired = self.redish.set(lock_key, self.test_uuid, nx=True, px=timeout * 1000)if not lock_acquired:pytest.fail("获取 Redis 锁失败(key: )" + lock_key + ",可能存在并发竞争")yield # 锁内逻辑执行区finally:# 释放锁(仅删除自己持有的锁,避免误删其他进程的锁)if lock_acquired:current_lock_val = self.redish.get(lock_key)if current_lock_val == self.test_uuid:self.redish.delete(lock_key)
生成安全的key:UUID前10位 + 时间戳,避免碰撞
def _generate_unique_id(self):"""生成并发安全的唯一ID(UUID前10位 + 时间戳,避免碰撞)"""timestamp = int(time.time() * 1000) # 毫秒级时间戳(确保时序唯一)uuid_part = int(str(uuid.uuid4()).replace('-', '')[:8], 16) # UUID前8位(16进制转10进制)return int("{}{}".format(timestamp, uuid_part)[:10]) # 截取10位,符合原ID长度
@contextmanager 是 Python 标准库 contextlib 模块中的一个装饰器,用于快速定义上下文管理器(Context Manager)。它的核心作用是简化「资源获取 - 使用 - 释放」的流程,确保资源(如文件、数据库连接、锁等)在使用后被正确释放,即使过程中发生异常
- redis枷锁
@contextmanagerdef redis_lock(self, key, timeout=5):"""Redis 分布式锁(解决多进程共享资源竞争)"""lock_key = "lock:{key}"lock_acquired = Falsetry:# 尝试获取锁(NX=不存在才设置,PX=过期时间毫秒)lock_acquired = self.redish.set(lock_key, self.test_uuid, nx=True, px=timeout * 1000)if not lock_acquired:pytest.fail("获取 Redis 锁失败(key: )" + lock_key + ",可能存在并发竞争")yield # 锁内逻辑执行区finally:# 释放锁(仅删除自己持有的锁,避免误删其他进程的锁)if lock_acquired:current_lock_val = self.redish.get(lock_key)if current_lock_val == self.test_uuid:self.redish.delete(lock_key)
在用到redis非删除操作的地方,先判断redis锁是否释放
- 需要导入的模块:
import uuid
from contextlib import contextmanager