在 Reactor 中,ConnectableFlux 是一种用于处理响应式流的机制,它允许你控制何时开始订阅和数据生成。通常情况下,订阅者(subscriber)在订阅时会立即开始接收数据,但有时你可能希望多个订阅者“会面”(rendezvous)之后再触发订阅和数据生成。这就是 ConnectableFlux 的用途。

1. ConnectableFlux 的主要模式

Flux API 提供了两种主要的模式来返回 ConnectableFluxpublishreplay

  • publish
    publish 会动态地尝试满足各个订阅者的需求(即背压),并通过将这些请求转发到源来实现。如果任何订阅者的挂起需求为 0,publish 会暂停对源的请求。
    例如,你可以使用 publish() 方法将一个冷发布者(cold publisher)转换为热发布者(hot publisher),从而允许多个订阅者共享同一个数据源。

  • replay
    replay 会缓存第一次订阅看到的数据,并在达到可配置的限制(如时间和缓冲区大小)后,将数据重放给后续的订阅者。
    例如,你可以使用 replay(2) 来缓存最近的 2 个数据点,并在新订阅者到来时重放这些数据。

2. ConnectableFlux 的管理方法

ConnectableFlux 提供了多种方法来管理订阅和源的连接:

  • connect()
    你可以手动调用 connect() 方法,当达到足够的订阅数时,触发对上游源的订阅。例如:

    ConnectableFlux<String> connectableFlux = Flux.just("A", "B", "C").publish();
    connectableFlux.connect();
    

    在调用 connect() 之前,connectableFlux 不会开始发送数据。

  • autoConnect(n)
    autoConnect(n) 可以自动执行与 connect() 类似的操作,当有 n 个订阅者订阅时,自动触发对源的订阅。例如:

    Flux<String> flux = Flux.just("A", "B", "C");
    ConnectableFlux<String> autoConnectFlux = flux.publish().autoConnect(2);
    

    这意味着当有 2 个或更多订阅者订阅时,autoConnectFlux 会自动开始发送数据。

  • refCount(n)
    refCount(n) 不仅可以自动跟踪传入的订阅,还可以检测订阅是否被取消。如果订阅者数量不足,refCount 会断开与源的连接,直到有新的订阅者出现。例如:

    Flux<String> flux = Flux.just("A", "B", "C");
    ConnectableFlux<String> refCountFlux = flux.publish().refCount(2);
    

    这意味着当有 2 个订阅者订阅时,refCountFlux 会自动开始发送数据;当所有订阅者取消订阅后,refCountFlux 会断开连接。

  • refCount(int, Duration)
    refCount(int, Duration) 增加了一个“宽限期”(grace period),即在订阅者数量低于阈值时,等待指定的时间后再断开连接。例如:

    Flux<String> flux = Flux.just("A", "B", "C");
    ConnectableFlux<String> refCountWithGrace = flux.publish().refCount(2, Duration.ofSeconds(10));
    

    这意味着在订阅者数量低于 2 时,refCountWithGrace 会等待 10 秒,看看是否有新的订阅者出现。

3. 应用场景

ConnectableFlux 适用于需要多个订阅者“会面”后再触发订阅和数据生成的场景。例如:

  • 实时数据推送:在实时数据推送中,你可能希望多个客户端在连接到服务器后才开始接收数据。使用 ConnectableFlux 可以确保所有客户端都准备好后再开始发送数据。
  • 分布式系统:在分布式系统中,你可能希望多个节点在协调一致后再触发数据生成。使用 ConnectableFlux 可以确保所有节点都准备好后再开始处理数据。
  • IoT 数据可视化:在 IoT 数据可视化中,你可能希望多个设备在连接到服务器后才开始发送数据。使用 ConnectableFlux 可以确保所有设备都准备好后再开始处理数据。

4. 相关案例

展示如何使用 ConnectableFlux 实现流的多订阅和延迟连接

package org.example;import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;/*** Main0011 类演示了如何使用 Reactor 创建和操作 Flux 流* 该类展示了如何使用 ConnectableFlux 实现流的多订阅和延迟连接*/
public class Main0011 {/*** 主函数展示了如何创建一个 Flux 流并使用 ConnectableFlux 进行订阅和连接操作** @param args 命令行参数* @throws InterruptedException 线程睡眠时可能抛出的异常*/public static void main(String[] args) throws InterruptedException {// 创建一个 Flux 源,产生 1 到 3 的整数序列,并在订阅时打印消息Flux<Integer> source = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("subscribed to source"));// 将 Flux 源转换为 ConnectableFlux,以便进行延迟连接和多订阅ConnectableFlux<Integer> co = source.publish();// 订阅 ConnectableFlux,此时不会开始产生数据co.subscribe(System.out::println, e -> {}, () -> {});// 再次订阅,演示多个订阅者co.subscribe(System.out::println, e -> {}, () -> {});// 打印消息,表明订阅已完成,但数据流尚未开始System.out.println("done subscribing");// 线程睡眠,模拟在连接前的准备或其他操作Thread.sleep(500);// 打印消息,表明即将连接数据流System.out.println("will now connect");// 连接数据流,使数据开始流动到所有已订阅的消费者co.connect();}
}

演示Reactor库中Flux的自动连接(autoConnect)功能

package org.example;import reactor.core.publisher.Flux;/*** 该类用于演示Reactor库中Flux的自动连接(autoConnect)功能* 它展示了如何使用autoConnect方法在多个订阅者之间共享一个数据流,* 并在达到指定的订阅者数量后自动开始数据流的发布*/
public class Main0012 {/*** 主函数,用于演示autoConnect的使用** @param args 命令行参数* @throws InterruptedException 当线程因中断策略被中断时抛出此异常*/public static void main(String[] args) throws InterruptedException {// 创建一个Flux数据源,范围从1到3,同时在订阅时打印消息Flux<Integer> source = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("subscribed to source"));// 使用autoConnect方法使数据源在有两个订阅者时自动连接Flux<Integer> autoCo = source.publish().autoConnect(2);// 第一个订阅者订阅数据流,并在接收到数据时打印出来autoCo.subscribe(System.out::println, e -> {}, () -> {});System.out.println("subscribed first");// 暂停500毫秒以模拟时间流逝Thread.sleep(500);System.out.println("subscribing second");// 第二个订阅者订阅数据流,此时达到autoConnect设定的条件,数据流开始发布autoCo.subscribe(System.out::println, e -> {}, () -> {});}
}

5. 总结

ConnectableFlux 是 Reactor 中用于处理响应式流的机制,它允许你控制何时开始订阅和数据生成。通过 publishreplay 模式,你可以实现多个订阅者“会面”后再触发订阅和数据生成。通过 connect()autoConnect(n)refCount(n)refCount(int, Duration) 方法,你可以灵活地管理订阅和源的连接。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87108.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87108.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/87108.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vite + vue 项目下使用 tailwindcss

版本 node: > 18.0.0 vue: 3.5.13 vite: 6.3.1 tailwindcss: 4.1.6 tailwindcss/vite: 4.1.6 tailwindcss ✅ 细粒度类库 提供数千个原子级CSS类&#xff08;如 text-center、bg-blue-500、p-4&#xff09;&#x1f9e9; 组合式开发 通过类名组合构建完全自定义的UI&#x…

Hibernate中save与saveOrUpdate的差异解析

在Hibernate中&#xff0c;save()和saveOrUpdate()都是用于持久化对象的方法&#xff0c;但它们的适用场景和行为有显著差异&#xff1a; 1. save()方法 核心行为&#xff1a; 仅适用于瞬时态&#xff08;Transient&#xff09;对象&#xff08;即新创建、未与Session关联的对象…

香橙派3B学习笔记14:deb 打包程序_解包前后脚本运行

本文学习如何用deb打包的方式打包自己需要调用系统库的程序。 然后实现deb解包前后的脚本运行。 目录 承接上文&#xff1a; 删除上文遗留的.so文件&#xff1a; 终止ledlight进程&#xff1a; 目标解释&#xff1a; 创建项目结构&#xff1a; 创建control文件&#xff1a; 创…

nanoGPT复现——prepare拆解(自己构建词表 VS tiktoken)

在nanoGPT的data文件夹有两个很相似的文件夹结构&#xff1a;shakespeare和shakespeare-char&#xff0c;这两种都是对shakespeare数据集的处理&#xff0c;但是shakespeare使用的是tiktoken对文字进行编码&#xff0c;另一个则是使用自己构建的词表 一、shakespeare-char&…

macos 安装 xcode

在 macOS 上安装 Xcode&#xff08;或者 Xcode Command Line Tools&#xff09;的方法如下&#xff1a; 1. 安装 Xcode Command Line Tools&#xff08;轻量级&#xff0c;满足大部分编译需求&#xff09; 终端命令&#xff1a; xcode-select --install会弹出安装提示&#x…

大学专业科普 | 云计算、大数据

大数据专业是近年来随着信息技术发展而兴起的热门学科&#xff0c;专注于从海量、多样化的数据中提取有价值信息&#xff0c;为各行业提供数据驱动的决策支持。 专业定义 大数据专业旨在培养掌握大数据采集、存储、管理、分析和应用等核心技术的人才。该专业融合了计算机科学…

本地文件自动提交到仓库

背景 将本地目录做一个存储仓库&#xff0c;将归档的文件放入其中。自动同步到远程仓库。 仓库配置 省略 配置密钥 用户可以 git pull \ git push \ git commit 自动 拉取、更新 脚本 文件名&#xff1a;autosave.sh #!/bin/zsh# 设置变量 LOCAL_DIR$1# 进入工作目录 cd "…

Ubuntu中控制用户存储空间配置步骤

目的&#xff0c;限制用户磁盘空间占用&#xff0c;例如给用户限制100-150G容量 1.安装磁盘配额工具 sudo apt-get install -y quota 2.备份并修改/etc/fstab文件&#xff0c;使能支持quota sudo cp /etc/fstab /etc/fstab.bak vim /etc/fstab #写入如下,usrjquotaaquota.u…

【网络】Linux 内核优化实战 - net.ipv4.tcp_rmem 和 net.core.rmem_default 关系

net.ipv4.tcp_rmem 和 net.core.rmem_default 都是 Linux 内核中控制网络接收缓冲区的参数,但它们的作用范围、优先级和使用场景存在明显区别。以下是详细对比: 核心区别 参数net.ipv4.tcp_rmemnet.core.rmem_default作用协议仅针对 TCP 协议针对 所有网络协议(TCP、UDP 等…

设计模式精讲 Day 14:命令模式(Command Pattern)

【设计模式精讲 Day 14】命令模式&#xff08;Command Pattern&#xff09; 文章内容 在“设计模式精讲”系列的第14天&#xff0c;我们来学习命令模式&#xff08;Command Pattern&#xff09;。命令模式是一种行为型设计模式&#xff0c;它将请求封装为对象&#xff0c;从而…

手机射频功放测试学习(二)——手机线性功放的静态电流和小信号(S-Parameter)测试

目录 一、概要 二、LPA的电流测试 1、LPA的泄漏电流测试 手动测试步骤如下: 自动化测试: 2、LPA的静态电流测试 手动测试步骤如下: 自动化测试: 三、LPA的S-Parameter测试 1、矢量网络分析仪校准 2、LPA的S参数手动测试步骤: 3、LPA的S参数自动测试步骤: 四…

基础算法合集-图论

本文将介绍数据结构图论部分中常见的算法 单源最短路径问题(用来计算一个点到其他所有顶点的最短路径) Dijkstra(n*n) 1. 初始化: 先找出从源点V0到各终点Vk的直达路径(V0,Vk), 即通过一条弧到达的路径 2. 选择: 从这些路径中找出一条长度最短的路径(V0,u) 3. 更新: 然后对其余…

vue-i18n 插件打包解析失效问题记录

vue-i18n 插件打包解析失效问题记录 开发环境中没有问题的&#xff0c;但打包发布之后就不行了&#xff0c;显示的就是模板字符串 // An highlighted block const messages {en: {step: {stepDesc1: Scan,stepDesc2: Analyze,stepDesc3: Result}},zh: {step: {stepDesc1: 扫描…

数据可视化 - 单子图

一、认识单子图 import matplotlib.pyplot as plt import numpy as np import pandas as pdplt.figure(num单子图, figsize(12, 8), facecolorw) # 中文字体 plt.rcParams[font.sans-serif] KaiTi # 负号显示 plt.rcParams[axes.unicode_minus] False# 2行&#xff0c;1列&a…

服务器上设置了代理之后,服务器可以访问外网,但是不能访问服务器本地。如何解决

你在服务器上设置了代理后&#xff0c;发现&#xff1a; 可以访问外网不能访问服务器本地地址&#xff08;如 localhost、127.0.0.1、内网IP&#xff09; 这是代理设置中常见的问题&#xff0c;尤其是当你设置了全局 HTTP/HTTPS 代理时。本地访问也会被强制走代理&#xff0c…

mysql启动报错:Can‘t connect to local MySQL server through socket

文章目录 一、报错内容二、解决方法 一、报错内容 在linux上启动mysql时报错 [rootlocalhost bin]# ./mysql -u root -p Enter password: ERROR 2002 (HY000): Cant connect to local MySQL server through socket /tmp/mysql.sock (2)执行以上命令后报错&#xff0c;并且也…

C# Avalonia 绑定模式 Mode 的区别,它们的应用场景

C# Avalonia 绑定模式 Mode 的区别&#xff0c;它们的应用场景 文章目录 1. **Default&#xff08;默认模式&#xff09;**2. **OneTime&#xff08;一次性绑定&#xff09;**3. **OneWay&#xff08;单向绑定&#xff09;**4. **TwoWay&#xff08;双向绑定&#xff09;**5. *…

【OpenGL学习】(七)纹理单元

【OpenGL学习】&#xff08;七&#xff09;纹理单元 OpenGL的纹理单元&#xff08;Texture Unit&#xff09;是GPU中用于管理和组织纹理资源的逻辑单元&#xff0c;它允许开发者在渲染过程中同时使用多个纹理&#xff0c;并通过采样器&#xff08;Sampler&#xff09;在着色器…

Ubuntu 下降 Linux Kernel 的版本备忘

此处以 ubuntu 22.04 为示例系统&#xff0c;来降低其 Linux kernel 的版本。 1. 降低 Linux kernel 版本 在 Ubuntu 22.04 上降低 Linux 内核版本的步骤如下所示。 步骤 1&#xff1a;检查当前内核版本 uname -r 确认当前运行的内核版本。 步骤 2&#xff1a;查看已安装的…

Python 数据分析与机器学习入门 (八):用 Scikit-Learn 跑通第一个机器学习模型

引言&#xff1a;初识 Scikit-Learn Scikit-learn 是 Python 机器学习领域的黄金标准库。它构建在 NumPy, SciPy 和 Matplotlib 之上&#xff0c;提供了大量用于分类、回归、聚类和降维等任务的算法。Scikit-learn 广受欢迎的原因在于其三大核心优势&#xff1a; 一致的 API 设…