Python 操作 Redis 的客户端 - Redis Stream

  • 1. Redis Stream
  • 2. Redis Commands
    • 2.1. `CoreCommands.xadd()` (生产端)
    • 2.2. `CoreCommands.xlen()` (生产端)
    • 2.3. `CoreCommands.xdel()` (生产端)
    • 2.4. `CoreCommands.xrange()` (生产端)
    • 2.5. `RedisClusterCommands.delete()`
  • 3. Redis Stream Examples
  • References

redis-py - Python Redis 客户端
https://redis.io/docs/latest/develop/clients/redis-py/
https://redis-py.pythonlang.cn/en/stable/index.html

redis-py (Redis Python client)
https://github.com/redis/redis-py

The Python interface to the Redis key-value store.

redis-py is the Python client for Redis. redis-py requires a running Redis server.

The sections below explain how to install redis-py and connect your Python application to a Redis database.

Redis Commands
https://redis.readthedocs.io/en/stable/commands.html

Redis 是一个开源的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。

1. Redis Stream

https://redis.com.cn/redis-stream.html

Redis Stream 主要用于消息队列 (Message Queue, MQ),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 是一种数据结构,用于处理大规模的时间序列数据。Redis Stream 的每个条目都有一个唯一的 id 和关联的数据字段,适合用于日志、事件收集或实时数据流等场景。

2. Redis Commands

2.1. CoreCommands.xadd() (生产端)

https://redis.readthedocs.io/en/stable/commands.html
https://redis.io/docs/latest/commands/xadd/

xadd(name, fields, id='*', maxlen=None, approximate=True, nomkstream=False, minid=None, limit=None, ref_policy=None)

Add to a stream.

name: name of the stream (name 是队列名称,如果队列 name 不存在就创建。)
fields: dict of field/value pairs to insert into the stream (添加到流的字段和值。)
id: location to insert this record. By default it is appended. (消息 id,使用 * 表示由 Redis 自动生成。可以自定义,但是要自己保证递增性。)
maxlen: truncate old stream members beyond this size. Can’t be specified with minid. (maxlen 是可选参数,设置流最大长度。)
approximate: actual stream length may be slightly more than maxlen
nomkstream: When set to true, do not make a stream
minid: the minimum id in the stream to query. Can’t be specified with maxlen. (可选参数,只有 id 大于给定 minid 的条目才会被加入流。)
limit: specifies the maximum number of entries to retrieve
ref_policy: optional reference policy for consumer groups when trimming:

  • KEEPREF (default): When trimming, preserves references in consumer groups’ PEL
  • DELREF: When trimming, removes all references from consumer groups’ PEL
  • ACKED: When trimming, only removes entries acknowledged by all consumer groups

Parameters

  • name (Union[bytes, str, memoryview])
  • fields (Dict[Union[bytes, bytearray, memoryview, str, int, float], Union[bytes, bytearray, memoryview, str, int, float]])
  • id (Union[int, bytes, str, memoryview])
  • maxlen (Optional[int])
  • approximate (bool)
  • nomkstream (bool)
  • minid (Optional[Union[int, bytes, str, memoryview]])
  • limit (Optional[int])
  • ref_policy (Optional[Literal[‘KEEPREF’, ‘DELREF’, ‘ACKED’]])

Return type

  • Union[Awaitable[Any], Any]

2.2. CoreCommands.xlen() (生产端)

https://redis.io/docs/latest/commands/xlen/

xlen(name)

Returns the number of elements in a given stream.

Parameters

  • name (Union[bytes, str, memoryview])

Return type

  • Union[Awaitable[Any], Any]

2.3. CoreCommands.xdel() (生产端)

https://redis.io/docs/latest/commands/xdel/

xdel(name, *ids)

Deletes one or more messages from a stream.

Parameters

  • name (Union[bytes, str, memoryview]) - name of the stream.
  • *ids (Union[int, bytes, str, memoryview]) - message ids to delete.

Return type
Union[Awaitable[Any], Any]

2.4. CoreCommands.xrange() (生产端)

https://redis.io/docs/latest/commands/xrange/

xrange(name, min='-', max='+', count=None)

Read stream values within an interval.

name: name of the stream.
start: first stream ID. defaults to ‘-‘, meaning the earliest available.
finish: last stream ID. defaults to ‘+’, meaning the latest available.
count: if set, only return this many items, beginning with the
earliest available.

Parameters

  • name (Union[bytes, str, memoryview])
  • min (Union[int, bytes, str, memoryview])
  • max (Union[int, bytes, str, memoryview])
  • count (Optional[int])

Return type
Union[Awaitable[Any], Any]

(base) yongqiang@yongqiang:~$ sudo service redis-server start
[sudo] password for yongqiang:
Starting redis-server: redis-server.
(base) yongqiang@yongqiang:~$
(base) yongqiang@yongqiang:~$ sudo service redis-server status* redis-server is running
(base) yongqiang@yongqiang:~$
#!/usr/bin/env python
# coding=utf-8import redis# 连接到 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db=0)# 定义流的名称
stream_key = "forever_stream"# 使用 xadd 将事件添加到流
r.xadd(stream_key, {"event": "signup", "user_id": 123})
r.xadd(stream_key, {"event": "login", "user_id": 456})
r.xadd(stream_key, {"event": "logout", "user_id": 789})# 打印当前流中的所有条目
entries = r.xrange(stream_key)
for entry in entries:print(entry)
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py 
(b'1756565556646-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756565556646-1', {b'event': b'login', b'user_id': b'456'})
(b'1756565556646-2', {b'event': b'logout', b'user_id': b'789'})Process finished with exit code 0

2.5. RedisClusterCommands.delete()

  • delete(*keys)

Deletes the given keys in the cluster. The keys are first split up into slots and then an DEL command is sent for every slot

Non-existent keys are ignored. Returns the number of keys that were deleted.

Parameters
keys (Union[bytes, str, memoryview]) –

Return type
Union[Awaitable[Any], Any]

#!/usr/bin/env python
# coding=utf-8import redis# 连接到 Redis 服务器
redis_client = redis.Redis(host='localhost', port=6379, db=0)# 定义流的名称
stream_key = "forever_stream"# 使用 xadd 将事件添加到流
redis_client.xadd(stream_key, {"event": "signup", "user_id": 123})
redis_client.xadd(stream_key, {"event": "login", "user_id": 456})
redis_client.xadd(stream_key, {"event": "logout", "user_id": 789})# 打印当前流中的所有条目
entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
for entry in entries:print(entry)redis_client.delete(stream_key)entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py 
length: 3
(b'1756567113334-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756567113334-1', {b'event': b'login', b'user_id': b'456'})
(b'1756567113334-2', {b'event': b'logout', b'user_id': b'789'})
length: 0Process finished with exit code 0
  • DEL

https://redis.io/docs/latest/commands/del/

Syntax

DEL key [key ...]

Removes the specified keys. A key is ignored if it does not exist.
删除整个 Redis Stream。

(base) yongqiang@yongqiang:~$ redis-cli
127.0.0.1:6379> del "forever_stream"
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> del "forever_stream"
(integer) 0
127.0.0.1:6379> exit
(base) yongqiang@yongqiang:~$

3. Redis Stream Examples

https://redis.readthedocs.io/en/stable/examples/redis-stream-example.html
https://redis-py.pythonlang.cn/en/stable/examples/redis-stream-example.html

References

[1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/
[2] What is Redis? https://www.ibm.com/think/topics/redis
[3] Redis 教程, https://redis.com.cn/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/94848.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/94848.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/94848.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Qt开发】按钮类控件(一)-> QPushButton

目录 1 -> 什么是 PushButton? 2 -> 相关属性 3 -> 代码示例 3.1 -> 带有图标的按钮 3.2 -> 带有快捷键的按钮 4 -> 总结 1 -> 什么是 PushButton? 在 Qt 框架中,QPushButton 是最基础且最常用的按钮控件之一&am…

Citrix 零日漏洞自五月起遭积极利用

安全研究员 Kevin Beaumont 披露了有关 CVE-2025-6543 的惊人细节,这是一个严重的 Citrix NetScaler 漏洞,在该公司发布补丁之前的几个月里,该漏洞被积极利用作为零日攻击。 Citrix 最初将其轻描淡写为简单的“拒绝服务”漏洞,但…

【系列08】端侧AI:构建与部署高效的本地化AI模型 第7章:架构设计与高效算子

第7章:架构设计与高效算子 要将AI模型成功部署到端侧,除了对现有模型进行压缩和优化,更根本的方法是在设计之初就考虑其在资源受限环境下的运行效率。本章将深入探讨如何设计高效的网络架构,以及如何理解并优化常用的核心算子。高…

42-Ansible-Inventory

文章目录Ansible基本概述手动运维时代(原始社会)自动化运维时代自动化运维工具的优势Ansible的功能及优点Ansible的架构Ansible的执行流程安装AnsibleAnsible配置文件生效顺序Ansible inventory主机清单Ansible基于免秘钥方式管理客户端小结Ansible-Adho…

Go语言runtime/trace工具全面解析

基本概念与功能 Go语言的runtime/trace是Go标准库中内置的性能分析工具,主要用于追踪和可视化Go程序的运行时行为。它能够记录程序执行期间的各种事件,包括goroutine调度、系统调用、垃圾回收(GC)、网络I/O、锁等待等关键信息。 trace工具的核心功能包括: goroutine生命周期…

Docker(自写)

Docker程序是跑在操作系统上的,而操作系统上又装了各种不同版本的依赖库和配置程序依赖环境,环境不同,程序就可能跑不起来,如果我们能将环境和程序一起打包docker就是可以将程序和环境一起打包并运行的工具软件基础镜像DockerFile…

深度拆解 OpenHarmony 位置服务子系统:从 GNSS 到分布式协同定位的全链路实战

1. 系统概述 OpenHarmony 的“定位子系统”就是硬件服务子系统集里的 “位置服务子系统”(Location SubSystem)。它向下对接 GNSS/GPS、基站、Wi-Fi 等定位模组,向上以 标准位置 API 形式为应用提供 实时位置、轨迹、地理围栏 等能力,并可与分布式软总线联动,实现 跨设备…

React Native基本用法

1,index调用registerComponent,把appName注入到React Native的根节点。 2,package.json是全局大管家,package-lock.json锁定版本,不会手动编辑,通过install安装 3, bebal.config.json bebal.config.json是翻…

LoraConfig target modules加入embed_tokens(64)

LoraConfig target modules加入embed_tokens 更好且成本更低的方法 嵌入层(embedding layer)的 lora_embedding_A 和 lora_embedding_B 头部(head)是否需加入目标模块列表 用户警告 解除权重绑定 解绑以后是随机权重,怎么办 更好且成本更低的方法 “有没有一种更好且成本…

笔记共享平台|基于Java+vue的读书笔记共享平台系统(源码+数据库+文档)

笔记共享平台|读书笔记共享平台系统 目录 基于Javavue的读书笔记共享平台系统 一、前言 二、系统设计 三、系统功能设计 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取 博主介绍:✌️大厂码农|毕设布道师&#xff…

【VSCode】VSCode为Java C/S项目添加图形用户界面

为Java C/S项目添加图形用户界面 现在我们来为它添加图形用户界面(GUI)。我将使用Java Swing库创建一个简单的GUI,因为它内置于Java标准库中,无需额外依赖。 客户端GUI实现 首先,我们将修改客户端代码,添加一个Swing GUI界面&…

【云原生】Docker 搭建Kafka服务两种方式实战操作详解

目录 一、前言 二、Docker 搭建kafka介绍 2.1 Docker 命令部署 2.2 使用Docker Compose 部署 2.3 使用 Docker Swarm 2.4 使用 Kubernetes 2.5 部署建议 三、Docker 搭建kafka操作方式一 3.1 前置准备 3.2 完整操作过程 3.2.1 创建docker网络 3.2.2 启动zookeeper容…

DBeaver中禁用PostgreSQL SSL的配置指南

在DBeaver中为PostgreSQL连接禁用SSL是一个常见的配置,特别是当你的数据库服务器未启用SSL或遇到连接问题时。我来为你详细讲解操作步骤和注意事项。 🛠️ DBeaver中禁用PostgreSQL SSL的配置指南 详细步骤 打开驱动设置:在DBeaver中创建新的…

数组去重【JavaScript】

数组去重,并且key和val相同的对象视为相同的,需要去重。主函数:/*** 数组去重* 两个属性相同的对象也认为是相同的* param {Array} arr* return {Array} */ function uniqueArray(arr) {const result []// outer: 标签,标记外层循…

基于单片机设计的智能停车系统_271

文章目录 一、前言 1.1 项目介绍 【1】项目开发背景 【2】设计实现的功能 【3】项目硬件模块组成 【4】设计意义 【5】国内外研究现状 【6】摘要 1.2 设计思路 1.3 系统功能总结 1.4 开发工具的选择 【1】设备端开发 【2】上位机开发 1.5 参考文献 1.6 系统框架图 1.7 系统原理…

for in+逻辑表达式 生成迭代对象,最后转化为列表 ——注意list是生成器转化为列表,但[生成器]得到的就是一个列表,其中包含一个生成器元素

(int(digit) ** 2 for digit in str(n))这个不是 数组(list),而是一个 生成器表达式 (generator expression)。它的作用是:str(n) 把数字 n 转成字符串,例如 n 82 → "82"。for digit in str(n) 遍历字符串中的每个字符 → "…

通信算法之321:verilog中generate if 用法-综合掉无用分支

文章目录 一.示例代码 二.优缺分析 三. generate - case 一.示例代码 提示:参考 // 根据添加/补偿频偏的标志,确定使用的频偏wire signed [WIDTH-1 : 0] freq;generateif(FREQ_FLAG == 1b1) beg

Shell 入门

目录 一、Shell 是什么 二、 .sh 脚本调用 .py 脚本 Python 核心逻辑脚本(data_processor.py) Shell 脚本(pipeline.sh) 三、常见命令 四、.sh脚本 1. 简单例子 2. 进阶例子 3. 猜数字游戏 一、Shell 是什么 Shell 的本…

UNet改进(36):融合FSATFusion的医学图像分割

1. 注意力机制的理论基础 1.1 空间注意力机制 空间注意力机制模拟人类视觉系统,能够关注图像中的显著区域。其核心思想是根据特征图的空间位置生成权重图,突出重要区域并抑制无关信息。常见的实现方式是通过沿通道维度的池化操作获取空间统计信息,然后通过卷积层生成空间注…

docker安装kafka、zookeeper详细步骤

Kafka 简介 Kafka 是一个分布式流处理平台,由 LinkedIn 开发并开源,主要用于高吞吐量的实时数据管道和流处理。 核心特性 高吞吐量:支持每秒百万级消息处理,适合大数据场景。 持久化存储:消息可持久化到磁盘,并支持多副本备份。 分布式架构:支持水平扩展,通过分区(P…