向DeepSeek的提问

帮我设计以下两个python class
Span
属性:
hash值(在init函数中通过时间初始化)
创建时间:时间(在init函数中通过时间初始化)
结束时间:时间(可选,默认为空)
子Span:数组对象类型为Span
parent:父Span
状态:打开,关闭
内容:str对象,若对象为非str类型string化。

函数
结束:如果当前Span的所有子Span都结束了,则可以结束(要求这个函数线程安全)
展示:按照当前Span和子Span的创建和时间顺序发挥Span

Span_Mgr
函数
查询最近所有的父Span,按照时间顺序返回
创建一个新的Span(输入内容),返回Span对象

你可以使用ExpiringDictStorage作为数据持久化的实现方式,以下为ExpiringDictStorage的定义

import json
import os
import threading
import timefrom filelock import FileLock
from openai.types.chat import ChatCompletionclass ExpiringDictStorage:def __init__(self, filename="data_store.json", expiry_days=7):self.filename = filenameself.expiry_days = expiry_daysself.lock = threading.Lock()self.file_lock = FileLock(filename + ".lock")self.data = self._load_data()def _load_data(self):"""加载数据并清理过期项"""with self.file_lock:if not os.path.exists(self.filename):return {"_metadata": {"last_clean": time.time()}, "data": {}}try:with open(self.filename, "r") as f:data = json.load(f)# 清理过期数据(每7天执行一次)current_time = time.time()if (current_time - data["_metadata"]["last_clean"]> self.expiry_days * 86400):data["data"] = {k: vfor k, v in data["data"].items()if current_time - v["timestamp"] <= self.expiry_days * 86400}data["_metadata"]["last_clean"] = current_timeself._save_data(data)return dataexcept (json.JSONDecodeError, KeyError):# 文件损坏时创建新文件return {"_metadata": {"last_clean": time.time()}, "data": {}}def _save_data(self, data):"""保存数据到文件"""with open(self.filename, "w") as f:json.dump(data, f, indent=2)def get(self, key, update_timestamp=True):"""获取值:param key: 要获取的键:param update_timestamp: 是否更新访问时间戳,默认为True"""with self.lock:if key not in self.data["data"]:return Noneitem = self.data["data"][key]value = item["value"]# 如果配置为更新时间戳,则更新if update_timestamp:item["timestamp"] = time.time()self._save_data(self.data)if item.get("_type") == "ChatCompletion":return ChatCompletion.model_validate_json(value)else:return valuedef set(self, key, value):"""设置值"""with self.lock:if isinstance(value, ChatCompletion):storage_item = {"value": value.model_dump_json(),"timestamp": time.time(),"_type": "ChatCompletion",}else:storage_item = {"value": value,"timestamp": time.time(),"_type": "string",}self.data["data"][key] = storage_itemself._save_data(self.data)def clean_expired(self):"""手动清理过期数据"""with self.lock:current_time = time.time()self.data["data"] = {k: vfor k, v in self.data["data"].items()if current_time - v["timestamp"] <= self.expiry_days * 86400}self.data["_metadata"]["last_clean"] = current_timeself._save_data(self.data)def __contains__(self, key):with self.lock:return key in self.data["data"]def __getitem__(self, key):return self.get(key)def __setitem__(self, key, value):self.set(key, value)# 使用示例
if __name__ == "__main__":# 安装依赖: pip install filelockstorage = ExpiringDictStorage(expiry_days=7)# 设置值storage["user1"] = {"name": "Alice", "age": 30}storage["user2"] = {"name": "Bob", "age": 25}# 获取值print("当前存储内容:")print(storage.data)print("user1的值:", storage["user1"])  # 输出: {'name': 'Alice', 'age': 30}# 检查键是否存在print("user2是否存在:", "user2" in storage)  # 输出: True# 模拟7天前的数据print("\n模拟7天前的数据...")old_time = time.time() - 8 * 86400  # 8天前(超过7天有效期)old_data = {"_metadata": {"last_clean": old_time},"data": {"user1": {"value": {"name": "Alice", "age": 30}, "timestamp": old_time},"user2": {"value": {"name": "Bob", "age": 25}, "timestamp": old_time},},}with open(storage.filename, "w") as f:json.dump(old_data, f)# 重新加载会自动清理过期数据print("\n重新加载存储(应自动清理过期数据)...")storage = ExpiringDictStorage(expiry_days=7)print("清理后的存储内容:")print(storage.data)  # 现在应该看到空字典,因为所有数据都过期了# 添加新数据验证功能正常print("\n添加新数据验证功能正常...")storage["user3"] = "afda"print("添加后的存储内容:")print(storage.data)

DeepSeek的回答

import time
import threading
import json
from typing import List, Optional, Dict, Any
from datetime import datetimeclass Span:def __init__(self, content: Any, parent: Optional['Span'] = None):"""初始化Span对象Args:content: 内容,如果不是字符串会被转换为字符串parent: 父Span对象,默认为None表示根Span"""self.hash = hash(time.time_ns())  # 使用纳秒时间戳生成hashself.create_time = time.time_ns()  # 创建时间(纳秒)self.end_time: Optional[int] = None  # 结束时间self.children: List[Span] = []  # 子Span列表self.parent = parent  # 父Spanself.status = "open"  # 状态:open或closedself.content = str(content)  # 内容,转换为字符串# 如果有父Span,将自己添加到父Span的子Span列表中if parent:parent.add_child(self)# 线程安全锁self._lock = threading.RLock()def add_child(self, child_span: 'Span') -> None:"""添加子Span(线程安全)"""with self._lock:self.children.append(child_span)def end(self) -> bool:"""结束当前Span(线程安全)Returns:bool: 是否成功结束Span"""with self._lock:# 检查所有子Span是否都已结束for child in self.children:if child.status != "closed":return False# 所有子Span都已结束,可以结束当前Spanself.end_time = time.time_ns()self.status = "closed"return Truedef get_duration(self) -> Optional[int]:"""获取Span持续时间(纳秒),如果未结束则返回None"""if self.end_time is None:return Nonereturn self.end_time - self.create_timedef display(self, level: int = 0) -> str:"""按照层级结构展示Span及其子SpanArgs:level: 当前层级(用于缩进)Returns:str: 格式化后的Span信息"""indent = "  " * levelcreate_time_str = datetime.fromtimestamp(self.create_time / 1e9).strftime('%Y-%m-%d %H:%M:%S.%f')if self.end_time:end_time_str = datetime.fromtimestamp(self.end_time / 1e9).strftime('%Y-%m-%d %H:%M:%S.%f')duration = f", Duration: {(self.end_time - self.create_time) / 1e6:.2f}ms"else:end_time_str = "Not ended"duration = ""result = f"{indent}Span(hash={self.hash}, Status={self.status}\n"result += f"{indent}  Create: {create_time_str}, End: {end_time_str}{duration}\n"result += f"{indent}  Content: {self.content}\n"# 递归显示子Spanfor child in sorted(self.children, key=lambda x: x.create_time):result += child.display(level + 1)return resultdef to_dict(self) -> Dict[str, Any]:"""将Span转换为字典(用于序列化)"""return {"hash": self.hash,"create_time": self.create_time,"end_time": self.end_time,"children": [child.to_dict() for child in self.children],"parent_hash": self.parent.hash if self.parent else None,"status": self.status,"content": self.content}@classmethoddef from_dict(cls, data: Dict[str, Any], span_mgr: 'Span_Mgr') -> 'Span':"""从字典创建Span对象"""span = cls.__new__(cls)span.hash = data["hash"]span.create_time = data["create_time"]span.end_time = data["end_time"]span.status = data["status"]span.content = data["content"]span._lock = threading.RLock()# 重建父子关系span.parent = span_mgr.get_span_by_hash(data["parent_hash"]) if data["parent_hash"] else None# 递归重建子Spanspan.children = []for child_data in data["children"]:child_span = cls.from_dict(child_data, span_mgr)child_span.parent = spanspan.children.append(child_span)return spandef __str__(self) -> str:return self.display()class Span_Mgr:def __init__(self, storage_filename: str = "span_data.json", expiry_days: int = 7):"""初始化Span管理器Args:storage_filename: 存储文件名expiry_days: 数据过期天数"""self.storage = ExpiringDictStorage(filename=storage_filename, expiry_days=expiry_days)self.root_spans: List[Span] = []  # 所有根Span(没有父Span的Span)self.all_spans: Dict[int, Span] = {}  # 所有Span的哈希映射self._lock = threading.RLock()# 从存储加载现有数据self._load_from_storage()def _load_from_storage(self) -> None:"""从存储加载Span数据"""with self._lock:stored_data = self.storage.get("span_data", update_timestamp=False)if stored_data:try:# 重建所有Span对象span_dicts = stored_data.get("spans", {})temp_spans = {}# 先创建所有Span对象(不设置父子关系)for hash_str, span_data in span_dicts.items():span_hash = int(hash_str)span = Span.__new__(Span)span.hash = span_hashspan.create_time = span_data["create_time"]span.end_time = span_data["end_time"]span.status = span_data["status"]span.content = span_data["content"]span._lock = threading.RLock()span.children = []  # 暂时为空temp_spans[span_hash] = span# 设置父子关系for span_hash, span_data in span_dicts.items():span = temp_spans[int(span_hash)]parent_hash = span_data["parent_hash"]if parent_hash:span.parent = temp_spans.get(parent_hash)# 重建子Span列表for child_hash in span_data["children_hashes"]:child_span = temp_spans.get(child_hash)if child_span:span.children.append(child_span)# 重建根Span列表和所有Span映射self.root_spans = [span for span in temp_spans.values() if span.parent is None]self.all_spans = temp_spansexcept Exception as e:print(f"Error loading span data: {e}")self.root_spans = []self.all_spans = {}def _save_to_storage(self) -> None:"""保存Span数据到存储"""with self._lock:spans_data = {}for span_hash, span in self.all_spans.items():spans_data[str(span_hash)] = {"create_time": span.create_time,"end_time": span.end_time,"parent_hash": span.parent.hash if span.parent else None,"children_hashes": [child.hash for child in span.children],"status": span.status,"content": span.content}self.storage.set("span_data", {"spans": spans_data})def create_span(self, content: Any, parent_hash: Optional[int] = None) -> Span:"""创建新的SpanArgs:content: Span内容parent_hash: 父Span的hash值,如果为None则创建根SpanReturns:Span: 新创建的Span对象"""with self._lock:parent = self.all_spans.get(parent_hash) if parent_hash else Nonenew_span = Span(content, parent)# 如果没有父Span,添加到根Span列表if parent is None:self.root_spans.append(new_span)# 添加到所有Span映射self.all_spans[new_span.hash] = new_span# 保存到存储self._save_to_storage()return new_spandef get_recent_parent_spans(self) -> List[Span]:"""查询最近所有的父Span(根Span),按照创建时间顺序返回Returns:List[Span]: 按创建时间排序的父Span列表"""with self._lock:return sorted(self.root_spans, key=lambda x: x.create_time, reverse=True)def get_span_by_hash(self, span_hash: int) -> Optional[Span]:"""根据hash值获取SpanArgs:span_hash: Span的hash值Returns:Optional[Span]: 找到的Span对象,如果不存在则返回None"""with self._lock:return self.all_spans.get(span_hash)def end_span(self, span_hash: int) -> bool:"""结束指定的SpanArgs:span_hash: 要结束的Span的hash值Returns:bool: 是否成功结束Span"""with self._lock:span = self.all_spans.get(span_hash)if span and span.end():self._save_to_storage()return Truereturn Falsedef display_all_spans(self) -> str:"""显示所有Span的层级结构"""with self._lock:result = ""for span in sorted(self.root_spans, key=lambda x: x.create_time):result += span.display()return result# 使用示例
if __name__ == "__main__":# 创建Span管理器span_mgr = Span_Mgr()# 创建一些Spanroot_span = span_mgr.create_span("Root operation")child1 = span_mgr.create_span("Child operation 1", root_span.hash)child2 = span_mgr.create_span("Child operation 2", root_span.hash)grandchild = span_mgr.create_span("Grandchild operation", child1.hash)# 显示所有Spanprint("All spans:")print(span_mgr.display_all_spans())# 结束子Span(应该失败,因为孙子Span还没结束)print(f"Ending child1: {span_mgr.end_span(child1.hash)}")# 结束孙子Spanprint(f"Ending grandchild: {span_mgr.end_span(grandchild.hash)}")# 现在可以结束child1print(f"Ending child1: {span_mgr.end_span(child1.hash)}")# 结束child2print(f"Ending child2: {span_mgr.end_span(child2.hash)}")# 现在可以结束根Spanprint(f"Ending root: {span_mgr.end_span(root_span.hash)}")# 显示最终状态print("\nFinal state:")print(span_mgr.display_all_spans())# 查询最近的父Spanprint("\nRecent parent spans:")for span in span_mgr.get_recent_parent_spans():create_time = datetime.fromtimestamp(span.create_time / 1e9).strftime('%Y-%m-%d %H:%M:%S')print(f"  Span(hash={span.hash}, created={create_time}, content={span.content})")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/93855.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/93855.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/93855.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hi3516CV610-00S 海思SOC芯片 可申请开发资料

1.1 概述Hi3516CV610 是一颗应用在安防市场的 IPC SoC。在开放操作系统、新一代视频编解码标准、网络安全和隐私保护、人工智能方面引领行业发展&#xff0c;主要面向室内外场景下的枪机、球机、半球机、海螺机、枪球一体机、双目长短焦机等产品形态&#xff0c;打造极具竞争力…

算法题Day4

目录 13. 练习13 : 整数十位 14. 练习14 : 时间转换 15. 练习15 : 小雨的游泳时间 13. 练习13 : 整数十位 解题方法: #include <iostream> using namespace std; int a; int main() {cin >> a;cout << a % 100 / 10 << endl;return 0; } 14. 练习…

加速你的故障排查:使用 Elasticsearch 构建家电手册的 RAG 应用

作者&#xff1a;来自 Elastic Alessandro Brofferio 学习如何使用 Elasticsearch 构建 RAG 应用&#xff0c;轻松排查你的家电问题。 想要获得 Elastic 认证吗&#xff1f;来看看下一次 Elasticsearch 工程师培训什么时候开始吧&#xff01; Elasticsearch 拥有大量新功能&am…

6.Shell脚本修炼手册---grep命令使用指南

grep 命令&#xff1a;从文本中精准筛选信息的实用指南 文章目录grep 命令&#xff1a;从文本中精准筛选信息的实用指南一、什么是 grep&#xff1f;为什么要用它&#xff1f;二、grep 基本语法三、常用选项详解&#xff08;附实例&#xff09;&#xff08;一&#xff09;模式选…

Python day51

浙大疏锦行 Python day51 复习日&#xff0c;DDPM class DenoiseDiffusion():def __init__(self, eps_model: nn.Module, n_steps: int, device: torch.device):super().__init__()self.eps_model eps_modelself.n_steps n_stepsself.device deviceself.beta torch.linsp…

数据结构:生成 (Generating) 一棵 AVL 树

目录 搭建“创世”的舞台 注入序列&#xff0c;观察演化 注入 10 注入 20 注入 30 注入 40 注入 50 注入 25 再次审视 上一讲&#xff0c;我们已经从最根本的逻辑出发&#xff0c;推导出了 AVL 树失衡时所必需的修复操作——旋转 (Rotation)。 现在&#xff0c;我们将…

github 上传代码步骤

登录GitHub → 点击右上角 ​​ → New Repository​​。填写仓库名称&#xff08;建议与本地项目同名&#xff09;&#xff0c;选择 ​​Public/Private​​。​​关键&#xff1a;不要勾选​​ “Initialize with README”&#xff08;避免与本地仓库冲突&#xff09;。点击 …

陪诊小程序系统开发:开启智慧就医新时代

在数字化浪潮的推动下&#xff0c;智慧医疗正逐渐成为现实。陪诊小程序系统的开发&#xff0c;作为智慧医疗领域的一次重要创新&#xff0c;正以其独特的魅力与优势&#xff0c;引领着就医新时代的到来。它不仅改变了传统就医模式&#xff0c;更以科技的力量&#xff0c;让医疗…

朝花夕拾(七)--------从混淆矩阵到分类报告全面解析​

目录 ​​机器学习模型评估指南&#xff1a;从混淆矩阵到分类报告全面解析​​ ​​1. 引言​​ ​​2. 混淆矩阵&#xff1a;模型评估的基石​​ ​​2.1 什么是混淆矩阵&#xff1f;​​ 2.2二分类问题的混淆矩阵 ​​二分类场景下的具体案例​ ​分析案例: 1.​​案例…

Python读取和设置PNG图片的像素值

在Python中&#xff0c;可以使用Pillow库或OpenCV库来读取和写入PNG图片的像素值。以下是两种方法的详细说明&#xff1a;1. 使用Pillow库Pillow是Python中常用的图像处理库&#xff0c;支持多种图像格式&#xff0c;包括PNG。读取像素值from PIL import Imageimg Image.open(…

SkyWalking + Elasticsearch8 容器化部署指南:国内镜像加速与生产级调优

SkyWalking Elasticsearch8 Docker 部署文档本文提供在 Ubuntu 服务器上&#xff0c;使用 Docker Compose 部署 SkyWalking&#xff08;OAPUI&#xff09;与 Elasticsearch 8 的完整步骤&#xff0c;数据/日志落地到 /media/disk2 前置条件 Ubuntu&#xff0c;已具备 sudo 权限…

有符号和无符号的区别

有符号&#xff08;Signed&#xff09;和无符号&#xff08;Unsigned&#xff09;是计算机编程中用来描述整数数据类型能否表示负数的两个概念。它们的主要区别在于能否表示负数以及数值的表示范围。以下是它们的核心区别&#xff1a;1. 能否表示负数有符号&#xff08;Signed&…

8月21日作业

1、Makefile中头文件发生过修改的解决&#xff1a; 处插入*.h依赖&#xff0c;对.h文件打的时间戳进行检查2、头删和输出//五、头删 void delete_head(seq_p s) {empty(s);for(int i1;i<s->len;i){s->data[i-1]s->data[i];}s->len--; }//六、输出 void output(s…

Lucene 8.5.0 的 `.pos` 文件**逻辑结构**

Lucene 8.5.0 的 .pos 文件**逻辑结构**&#xff08;按真实实现重新整理&#xff09; .pos 文件 ├─ Header (CodecHeader) ├─ TermPositions TermCount ← 每个 term 一段&#xff0c;顺序由词典隐式决定 │ ├─ PackedPosDeltaBlock N ← 仅当 **无 payl…

基于Matlab多技术融合的红外图像增强方法研究

红外图像在低照度、强干扰和复杂环境下具有较强的成像能力&#xff0c;但受传感器噪声、成像条件及大气衰减等因素影响&#xff0c;原始红外图像往往存在对比度低、细节模糊及光照不均等问题。本文针对红外图像质量退化的特点&#xff0c;提出了一种基于多算法融合的红外图像增…

【时时三省】集成测试 简介

山不在高,有仙则名。水不在深,有龙则灵。 ----CSDN 时时三省 目录 1,集成测试含义 2,集成测试 验证方法 3,集成测试 用例设计方法 4,集成测试输出物 5,集成测试注意点 1,集成测试含义 单元测试在以V模型的流程中,对应的是架构设计阶段。在 单元测试 和 架构设计…

leetcode 76 最小覆盖子串

一、题目描述二、解题思路整体思路&#xff1a;模拟寻找最小覆盖子集的过程&#xff0c;由于可借助同向双指针且可以做到指针不回退&#xff0c;所以可以用滑动窗口的思想来解决这个问题。具体思路&#xff1a;(1)数组hash1用于统计t中每一个字符出现的频次&#xff0c;变量kin…

阿里云ECS服务器的公网IP地址

文章目录环境背景查询公网IP地址阿里云控制台阿里云客户端工具&#xff08;图形界面&#xff09;阿里云CLI工具&#xff08;命令行&#xff09;其它方法元数据服务器ipinfo.io参考注&#xff1a;本文介绍了如何获取阿里云ECS服务器的公网IP地址&#xff0c;可以顺便了解一下和阿…

IPSec 与 IKE 核心知识点总结

一、IPSec 安全基础IPSec 是保障 IP 数据传输安全的核心协议&#xff0c;其核心围绕密钥管理和安全策略约定展开&#xff0c;具体包括以下关键内容&#xff1a;1. 对称密钥的作用与要求对称密钥是 IPSec 实现加密、验证的基础&#xff0c;主要用于三个场景&#xff1a;加密 / 解…

C2ComponentStore

1. C2ComponentStore这是 Codec 2.0 HAL 的抽象接口&#xff08;frameworks/av/media/codec2/core/include/C2ComponentStore.h&#xff09;。代表一个「组件工厂」&#xff0c;负责&#xff1a;枚举当前可用的 Codec2 组件&#xff08;解码器、编码器&#xff09;。创建组件&a…