向DeepSeek的提问
帮我设计以下两个python class
Span
属性:
hash值(在init函数中通过时间初始化)
创建时间:时间(在init函数中通过时间初始化)
结束时间:时间(可选,默认为空)
子Span:数组对象类型为Span
parent:父Span
状态:打开,关闭
内容:str对象,若对象为非str类型string化。
函数
结束:如果当前Span的所有子Span都结束了,则可以结束(要求这个函数线程安全)
展示:按照当前Span和子Span的创建和时间顺序发挥Span
Span_Mgr
函数
查询最近所有的父Span,按照时间顺序返回
创建一个新的Span(输入内容),返回Span对象
你可以使用ExpiringDictStorage作为数据持久化的实现方式,以下为ExpiringDictStorage的定义
import json
import os
import threading
import timefrom filelock import FileLock
from openai.types.chat import ChatCompletionclass ExpiringDictStorage:def __init__(self, filename="data_store.json", expiry_days=7):self.filename = filenameself.expiry_days = expiry_daysself.lock = threading.Lock()self.file_lock = FileLock(filename + ".lock")self.data = self._load_data()def _load_data(self):"""加载数据并清理过期项"""with self.file_lock:if not os.path.exists(self.filename):return {"_metadata": {"last_clean": time.time()}, "data": {}}try:with open(self.filename, "r") as f:data = json.load(f)# 清理过期数据(每7天执行一次)current_time = time.time()if (current_time - data["_metadata"]["last_clean"]> self.expiry_days * 86400):data["data"] = {k: vfor k, v in data["data"].items()if current_time - v["timestamp"] <= self.expiry_days * 86400}data["_metadata"]["last_clean"] = current_timeself._save_data(data)return dataexcept (json.JSONDecodeError, KeyError):# 文件损坏时创建新文件return {"_metadata": {"last_clean": time.time()}, "data": {}}def _save_data(self, data):"""保存数据到文件"""with open(self.filename, "w") as f:json.dump(data, f, indent=2)def get(self, key, update_timestamp=True):"""获取值:param key: 要获取的键:param update_timestamp: 是否更新访问时间戳,默认为True"""with self.lock:if key not in self.data["data"]:return Noneitem = self.data["data"][key]value = item["value"]# 如果配置为更新时间戳,则更新if update_timestamp:item["timestamp"] = time.time()self._save_data(self.data)if item.get("_type") == "ChatCompletion":return ChatCompletion.model_validate_json(value)else:return valuedef set(self, key, value):"""设置值"""with self.lock:if isinstance(value, ChatCompletion):storage_item = {"value": value.model_dump_json(),"timestamp": time.time(),"_type": "ChatCompletion",}else:storage_item = {"value": value,"timestamp": time.time(),"_type": "string",}self.data["data"][key] = storage_itemself._save_data(self.data)def clean_expired(self):"""手动清理过期数据"""with self.lock:current_time = time.time()self.data["data"] = {k: vfor k, v in self.data["data"].items()if current_time - v["timestamp"] <= self.expiry_days * 86400}self.data["_metadata"]["last_clean"] = current_timeself._save_data(self.data)def __contains__(self, key):with self.lock:return key in self.data["data"]def __getitem__(self, key):return self.get(key)def __setitem__(self, key, value):self.set(key, value)# 使用示例
if __name__ == "__main__":# 安装依赖: pip install filelockstorage = ExpiringDictStorage(expiry_days=7)# 设置值storage["user1"] = {"name": "Alice", "age": 30}storage["user2"] = {"name": "Bob", "age": 25}# 获取值print("当前存储内容:")print(storage.data)print("user1的值:", storage["user1"]) # 输出: {'name': 'Alice', 'age': 30}# 检查键是否存在print("user2是否存在:", "user2" in storage) # 输出: True# 模拟7天前的数据print("\n模拟7天前的数据...")old_time = time.time() - 8 * 86400 # 8天前(超过7天有效期)old_data = {"_metadata": {"last_clean": old_time},"data": {"user1": {"value": {"name": "Alice", "age": 30}, "timestamp": old_time},"user2": {"value": {"name": "Bob", "age": 25}, "timestamp": old_time},},}with open(storage.filename, "w") as f:json.dump(old_data, f)# 重新加载会自动清理过期数据print("\n重新加载存储(应自动清理过期数据)...")storage = ExpiringDictStorage(expiry_days=7)print("清理后的存储内容:")print(storage.data) # 现在应该看到空字典,因为所有数据都过期了# 添加新数据验证功能正常print("\n添加新数据验证功能正常...")storage["user3"] = "afda"print("添加后的存储内容:")print(storage.data)
DeepSeek的回答
import time
import threading
import json
from typing import List, Optional, Dict, Any
from datetime import datetimeclass Span:def __init__(self, content: Any, parent: Optional['Span'] = None):"""初始化Span对象Args:content: 内容,如果不是字符串会被转换为字符串parent: 父Span对象,默认为None表示根Span"""self.hash = hash(time.time_ns()) # 使用纳秒时间戳生成hashself.create_time = time.time_ns() # 创建时间(纳秒)self.end_time: Optional[int] = None # 结束时间self.children: List[Span] = [] # 子Span列表self.parent = parent # 父Spanself.status = "open" # 状态:open或closedself.content = str(content) # 内容,转换为字符串# 如果有父Span,将自己添加到父Span的子Span列表中if parent:parent.add_child(self)# 线程安全锁self._lock = threading.RLock()def add_child(self, child_span: 'Span') -> None:"""添加子Span(线程安全)"""with self._lock:self.children.append(child_span)def end(self) -> bool:"""结束当前Span(线程安全)Returns:bool: 是否成功结束Span"""with self._lock:# 检查所有子Span是否都已结束for child in self.children:if child.status != "closed":return False# 所有子Span都已结束,可以结束当前Spanself.end_time = time.time_ns()self.status = "closed"return Truedef get_duration(self) -> Optional[int]:"""获取Span持续时间(纳秒),如果未结束则返回None"""if self.end_time is None:return Nonereturn self.end_time - self.create_timedef display(self, level: int = 0) -> str:"""按照层级结构展示Span及其子SpanArgs:level: 当前层级(用于缩进)Returns:str: 格式化后的Span信息"""indent = " " * levelcreate_time_str = datetime.fromtimestamp(self.create_time / 1e9).strftime('%Y-%m-%d %H:%M:%S.%f')if self.end_time:end_time_str = datetime.fromtimestamp(self.end_time / 1e9).strftime('%Y-%m-%d %H:%M:%S.%f')duration = f", Duration: {(self.end_time - self.create_time) / 1e6:.2f}ms"else:end_time_str = "Not ended"duration = ""result = f"{indent}Span(hash={self.hash}, Status={self.status}\n"result += f"{indent} Create: {create_time_str}, End: {end_time_str}{duration}\n"result += f"{indent} Content: {self.content}\n"# 递归显示子Spanfor child in sorted(self.children, key=lambda x: x.create_time):result += child.display(level + 1)return resultdef to_dict(self) -> Dict[str, Any]:"""将Span转换为字典(用于序列化)"""return {"hash": self.hash,"create_time": self.create_time,"end_time": self.end_time,"children": [child.to_dict() for child in self.children],"parent_hash": self.parent.hash if self.parent else None,"status": self.status,"content": self.content}@classmethoddef from_dict(cls, data: Dict[str, Any], span_mgr: 'Span_Mgr') -> 'Span':"""从字典创建Span对象"""span = cls.__new__(cls)span.hash = data["hash"]span.create_time = data["create_time"]span.end_time = data["end_time"]span.status = data["status"]span.content = data["content"]span._lock = threading.RLock()# 重建父子关系span.parent = span_mgr.get_span_by_hash(data["parent_hash"]) if data["parent_hash"] else None# 递归重建子Spanspan.children = []for child_data in data["children"]:child_span = cls.from_dict(child_data, span_mgr)child_span.parent = spanspan.children.append(child_span)return spandef __str__(self) -> str:return self.display()class Span_Mgr:def __init__(self, storage_filename: str = "span_data.json", expiry_days: int = 7):"""初始化Span管理器Args:storage_filename: 存储文件名expiry_days: 数据过期天数"""self.storage = ExpiringDictStorage(filename=storage_filename, expiry_days=expiry_days)self.root_spans: List[Span] = [] # 所有根Span(没有父Span的Span)self.all_spans: Dict[int, Span] = {} # 所有Span的哈希映射self._lock = threading.RLock()# 从存储加载现有数据self._load_from_storage()def _load_from_storage(self) -> None:"""从存储加载Span数据"""with self._lock:stored_data = self.storage.get("span_data", update_timestamp=False)if stored_data:try:# 重建所有Span对象span_dicts = stored_data.get("spans", {})temp_spans = {}# 先创建所有Span对象(不设置父子关系)for hash_str, span_data in span_dicts.items():span_hash = int(hash_str)span = Span.__new__(Span)span.hash = span_hashspan.create_time = span_data["create_time"]span.end_time = span_data["end_time"]span.status = span_data["status"]span.content = span_data["content"]span._lock = threading.RLock()span.children = [] # 暂时为空temp_spans[span_hash] = span# 设置父子关系for span_hash, span_data in span_dicts.items():span = temp_spans[int(span_hash)]parent_hash = span_data["parent_hash"]if parent_hash:span.parent = temp_spans.get(parent_hash)# 重建子Span列表for child_hash in span_data["children_hashes"]:child_span = temp_spans.get(child_hash)if child_span:span.children.append(child_span)# 重建根Span列表和所有Span映射self.root_spans = [span for span in temp_spans.values() if span.parent is None]self.all_spans = temp_spansexcept Exception as e:print(f"Error loading span data: {e}")self.root_spans = []self.all_spans = {}def _save_to_storage(self) -> None:"""保存Span数据到存储"""with self._lock:spans_data = {}for span_hash, span in self.all_spans.items():spans_data[str(span_hash)] = {"create_time": span.create_time,"end_time": span.end_time,"parent_hash": span.parent.hash if span.parent else None,"children_hashes": [child.hash for child in span.children],"status": span.status,"content": span.content}self.storage.set("span_data", {"spans": spans_data})def create_span(self, content: Any, parent_hash: Optional[int] = None) -> Span:"""创建新的SpanArgs:content: Span内容parent_hash: 父Span的hash值,如果为None则创建根SpanReturns:Span: 新创建的Span对象"""with self._lock:parent = self.all_spans.get(parent_hash) if parent_hash else Nonenew_span = Span(content, parent)# 如果没有父Span,添加到根Span列表if parent is None:self.root_spans.append(new_span)# 添加到所有Span映射self.all_spans[new_span.hash] = new_span# 保存到存储self._save_to_storage()return new_spandef get_recent_parent_spans(self) -> List[Span]:"""查询最近所有的父Span(根Span),按照创建时间顺序返回Returns:List[Span]: 按创建时间排序的父Span列表"""with self._lock:return sorted(self.root_spans, key=lambda x: x.create_time, reverse=True)def get_span_by_hash(self, span_hash: int) -> Optional[Span]:"""根据hash值获取SpanArgs:span_hash: Span的hash值Returns:Optional[Span]: 找到的Span对象,如果不存在则返回None"""with self._lock:return self.all_spans.get(span_hash)def end_span(self, span_hash: int) -> bool:"""结束指定的SpanArgs:span_hash: 要结束的Span的hash值Returns:bool: 是否成功结束Span"""with self._lock:span = self.all_spans.get(span_hash)if span and span.end():self._save_to_storage()return Truereturn Falsedef display_all_spans(self) -> str:"""显示所有Span的层级结构"""with self._lock:result = ""for span in sorted(self.root_spans, key=lambda x: x.create_time):result += span.display()return result# 使用示例
if __name__ == "__main__":# 创建Span管理器span_mgr = Span_Mgr()# 创建一些Spanroot_span = span_mgr.create_span("Root operation")child1 = span_mgr.create_span("Child operation 1", root_span.hash)child2 = span_mgr.create_span("Child operation 2", root_span.hash)grandchild = span_mgr.create_span("Grandchild operation", child1.hash)# 显示所有Spanprint("All spans:")print(span_mgr.display_all_spans())# 结束子Span(应该失败,因为孙子Span还没结束)print(f"Ending child1: {span_mgr.end_span(child1.hash)}")# 结束孙子Spanprint(f"Ending grandchild: {span_mgr.end_span(grandchild.hash)}")# 现在可以结束child1print(f"Ending child1: {span_mgr.end_span(child1.hash)}")# 结束child2print(f"Ending child2: {span_mgr.end_span(child2.hash)}")# 现在可以结束根Spanprint(f"Ending root: {span_mgr.end_span(root_span.hash)}")# 显示最终状态print("\nFinal state:")print(span_mgr.display_all_spans())# 查询最近的父Spanprint("\nRecent parent spans:")for span in span_mgr.get_recent_parent_spans():create_time = datetime.fromtimestamp(span.create_time / 1e9).strftime('%Y-%m-%d %H:%M:%S')print(f" Span(hash={span.hash}, created={create_time}, content={span.content})")