1. Pulsar Template

在Pulsar生产者端,Spring Boot自动配置提供了一个用于发布记录的PulsarTemplate。该模板实现了一个名为PulsarOperations的接口,并提供了通过其合约发布记录的方法。

这些send API方法有两类:send和sendAsync。send方法通过Pulsar生成器上的同步发送功能阻止调用。它们返回消息在代理上持久化后发布的消息的MessageId。sendAsync方法调用是非阻塞的异步调用。它们返回一个CompletableFuture,您可以在消息发布后使用它异步接收消息ID。

对于不包含主题参数的API变体,将使用主题解析过程来确定目标主题。

1.1. Simple API

该模板为简单的发送请求提供了一些方法(前缀为“send”)。对于更复杂的发送请求,流畅的API可以让您配置更多选项。

1.2. Fluent API

该模板提供了一个流畅的构建器来处理更复杂的发送请求。

1.3. Message customization

您可以指定一个TypedMessageBuilderCustomizer来配置传出消息。例如,以下代码显示了如何发送键控消息:

template.newMessage(msg).withMessageCustomizer((mb) -> mb.key("foo-msg-key")).send();

1.4. Producer customization

您可以指定一个ProducerBuilderCustomizer来配置底层Pulsar生产者构建器,该生成器最终构建用于发送传出消息的生产者。

请谨慎使用,因为这可以完全访问生产者构建器,调用其某些方法(如create)可能会产生意想不到的副作用。

例如,以下代码显示了如何禁用批处理和启用分块:

template.newMessage(msg).withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false)).send();

另一个示例显示了如何在将记录发布到分区主题时使用自定义路由。在Producer构建器上指定自定义MessageRouter实现,例如:

template.newMessage(msg).withProducerCustomizer((pb) -> pb.messageRouter(messageRouter)).send();

请注意,使用MessageRouter时,spring.pulsar.producter.message-routing-mode的唯一有效设置是自定义。

另一个示例显示了如何添加一个ProducerInterceptor,该拦截器将拦截和修改生产者在发布到代理之前收到的消息:

template.newMessage(msg).withProducerCustomizer((pb) -> pb.intercept(interceptor)).send();

定制程序将仅适用于用于发送操作的生产者。如果要将自定义程序应用于所有生产商,则必须按照全球生产商自定义中的描述将其提供给生产商工厂。

2. Specifying Schema Information

如果您使用Java基元类型,框架会自动为您检测模式,您不需要指定任何模式类型来发布数据。对于非基元类型,如果在PulsarTemplate上调用send操作时没有明确指定Schema,则Spring For Apache Pulsar框架将尝试构建Schema。JSON类型。

目前支持的复杂模式类型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和带内联编码的KEY_VALUE。

2.1. Custom Schema Mapping

作为在PulsarTemplate上调用复杂类型的发送操作时指定模式的替代方法,模式解析器可以配置类型的映射。这消除了在框架使用传出消息类型咨询解析器时指定模式的需要。

2.1.1. Configuration properties

模式映射可以使用spring.pulsar.defaults.type-mappings属性进行配置。以下示例使用application.yml分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON

消息类型是消息类的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首选方法是通过上述属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。

以下示例使用模式解析器定制器分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation

指定用于特定消息类型的默认模式信息的另一种选择是用@PulsarMessage注释标记消息类。可以通过注释上的schemaType属性指定架构信息。

以下示例将系统配置为在生成或使用Foo类型的消息时使用JSON作为默认模式:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要在发送操作上设置或指定模式。

2.2. Producing with AUTO_SCHEMA

如果没有机会提前知道Pulsar主题的模式类型,您可以使用AUTO_PRODUCE模式将原始JSON或Avro有效载荷安全地发布为byte[]。

在这种情况下,生产者会验证出站字节是否与目标主题的模式兼容。

只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()发送操作如下例所示:

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}

这仅支持Avro和JSON模式类型。

3. Pulsar Producer Factory

PulsarTemplate依赖于PulsarProducerFactory来实际创建底层生产者。Spring Boot自动配置还提供了这个生产者工厂,您可以通过指定任何Spring.pulser.producer.*应用程序属性来进一步配置它。

如果在直接使用生产者工厂API时没有指定主题信息,则使用PulsarTemplate使用的相同主题解析过程,唯一的例外是省略了“消息类型默认”步骤。

3.1. Global producer customization

该框架提供了ProducerBuilderCustomizer合约,该合约允许您配置用于构建每个生产者的底层构建器。要自定义所有生产者,您可以将自定义者列表传递给PulsarProducerFactory构造函数。使用多个自定义程序时,它们将按照在列表中显示的顺序应用。

如果您使用Spring Boot自动配置,您可以将自定义程序指定为bean,它们将根据其@Order注释自动传递给PulsarProducerFactory。

如果您只想将自定义程序应用于单个生产商,您可以使用Fluent API并在发送时指定自定义程序。

4. Pulsar Producer Caching

每个底层Pulsar生产者都会消耗资源。为了提高性能并避免不断创建生产者,生产者工厂会缓存它创建的生产者。它们以LRU方式缓存,并在配置的时间段内未被使用时被驱逐。缓存键由足够的信息组成,以确保在后续的创建请求中,调用者返回相同的生产者。

此外,您可以通过指定任何spring.pulsinger.producer.cache.*应用程序属性来配置缓存设置。

4.1. Caution on Lambda customizers

任何用户提供的生产者定制器也包含在缓存密钥中。由于缓存键依赖于equals/hashCode的有效实现,因此在使用Lambda自定义程序时必须谨慎。

规则:实现为Lambdas的两个自定义程序将在equals/hashCode上匹配,当且仅当它们使用相同的Lambda实例并且不需要在其闭包外定义任何变量时。

为了澄清上述规则,我们将看几个例子。在下面的示例中,定制器被定义为内联Lambda,这意味着对sendUser的每次调用都使用相同的Lambda实例。此外,它不需要闭包外的变量。因此,它将作为缓存键匹配。

void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName("user")).send();
}

在下一种情况下,定制器被定义为内联Lambda,这意味着对sendUser的每次调用都使用相同的Lambda实例。但是,它需要一个闭包外的变量。因此,它将不匹配为缓存键。

void sendUser() {var user = randomUser();var name = randomName();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName(name)).send();
}

在最后一个例子中,定制器被定义为内联Lambda,这意味着对sendUser的每次调用都使用相同的Lambda实例。虽然它确实使用了一个变量名,但它并非源自其闭包之外,因此将作为缓存键进行匹配。这说明变量可以在Lambda闭包中使用,甚至可以调用静态方法。

void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> {var name = SomeHelper.someStaticMethod();b.producerName(name);}).send();
}

规则:如果你的Lambda定制器不是只定义一次(在后续调用中使用相同的实例),或者它需要在闭包之外定义变量,那么你必须提供一个具有有效equals/hashCode实现的定制器实现。

如果不遵守这些规则,那么生产者缓存将始终丢失,您的应用程序性能将受到负面影响。

5. Intercept Messages on the Producer

添加ProducerInterceptor可以让您在生产者接收到的消息发布到代理之前对其进行拦截和修改。为此,您可以将拦截器列表传递给PulsarTemplate构造函数。使用多个拦截器时,应用它们的顺序是它们在列表中出现的顺序。

如果使用Spring Boot自动配置,则可以将拦截器指定为Beans。它们会自动传递给PulsarTemplate。拦截器的排序是通过使用@Order注释实现的,如下所示:

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {...
}@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {...
}

如果您没有使用启动器,则需要自己配置和注册上述组件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/90388.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/90388.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/90388.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS揭秘:10.平行四边形

前置知识&#xff1a;基本的css变形一、平行四边形 要实现一个平行四边形&#xff0c;可以使用CSS的skew变形属性来倾斜元素。 transform: skewX(-45deg);图-1显示容器和内容都出现了倾斜&#xff0c;该如何解决这个问题&#xff1f; 二、嵌套方案 我们通过将内容嵌套 div 并使…

深度学习 必然用到的 线性代数知识

把标量到张量、点积到范数全串起来&#xff0c;帮你从 0 → 1 搭建 AI 数学底座 &#x1f680; 1 标量&#xff1a;深度学习的最小单元 标量 就是一维空间里的“点”&#xff0c;只有大小没有方向。例如温度 52 F、学习率 0.001。 记号&#xff1a;普通小写 x&#xff1b;域&am…

OpenGL ES 纹理以及纹理的映射

文章目录开启纹理创建纹理绑定纹理生成纹理纹理坐标图像配置线性插值重复效果限制拉伸完整代码在 Android OpenGL ES 中使用纹理&#xff08;Texture&#xff09;可以显著提升图形渲染的质量和效率。以下是使用纹理的主要好处&#xff1a; 增强视觉真实感 纹理可以将复杂的图像…

从金字塔到个性化路径:AI 正在重新定义学习方式

几十年来&#xff0c;我们的教育系统始终遵循着一条熟悉的路线&#xff1a; 从小学、初中、高中&#xff0c;再到大学和研究生。这条标准化的路径&#xff08;K-12 到研究所&#xff09;结构清晰&#xff0c;却也缓慢。但在当今这个信息爆炸、知识快速更新、个性化需求高涨的时…

产品经理岗位职责拆解

以下是产品经理岗位职责的详细分解表&#xff0c;涵盖工作内容、核心动作及输出成果&#xff1a;岗位职责具体工作内容输出成果1. 日常版本迭代管理需求分析及PRD产出协调资源推动产品上线- 收集业务/用户需求&#xff0c;分析可行性及优先级- 撰写PRD文档&#xff0c;明确功能…

后端微服务基础架构Spring Cloud

版本关系 版本发布说明-阿里云Spring Cloud Alibaba官网 选择 创建项目 创建父项目 什么都不动&#xff0c;创建即可 1) 删掉没用的文件 保留 2) pom中加入 打包方式 <packaging>pom</packaging> 3) 删掉src 4) pom.xml中删除没用的 5)更改pom.xml中 spring…

数据分析框架和方法

一、核心分析框架 (The Big Picture Frameworks)​​描述性分析 (What Happened?)​​​​目的&#xff1a;​​ 了解过去发生了什么&#xff0c;描述现状&#xff0c;监控业务健康。​​核心工作&#xff1a;​​ 汇总、聚合、计算基础指标 (KPI)&#xff0c;生成报表和仪表盘…

电路研究9.3.10——合宙Air780EP中的AT开发指南:阿里云应用指南

这个好像也用不到&#xff0c;不过可以先贴出来。简单看了一下也没深入分析&#xff0c;直接扒过来了&#xff0c;感觉涉及到了上位机的学习了。我这下位机的可能用不到&#xff0c;就是贴过来好了。 应用概述 使用 AT 方式连接阿里云分为一机一密和一型一密&#xff0c;其中一…

[Backlog] 核心协调器 | 终端用户界面(TUI)实现 | 多分支任务冲突解决 | 测试验证体系

第8章 核心协调器 欢迎回到Backlog.md&#xff01; 在上一章文件系统操作中&#xff0c;我们深入了解了数据物理存储层面的读写机制。本章将聚焦系统的神经中枢——核心协调器。 核心协调器的本质&#xff08;中央决策引擎&#xff09; 如果将Backlog.md视为项目管理团队&a…

车载以太网-TC8测试-UT(Upper Tester)

目录 一、技术原理:指令体系与协议适配1. **指令格式与传输机制**2. **协议栈交互逻辑**3. **规范遵循与版本演进**二、测试应用:TC8测试场景与案例1. **TCP协议栈深度验证**2. **ARP协议健壮性测试**3. **SOME/IP服务动态管理**三、实现挑战与解决方案1. **实时性要求**2. *…

扣子Coze纯前端部署多Agents

纯前端网页搭建&#xff0c;无需任何后端代码&#xff0c;方便快捷&#xff01; 就像公司前台的多功能控制台&#xff0c;员工可以通过按钮快速呼叫不同的AI助手。具备多设备适配、智能对话等基础能力。 支持添加多个智能体 配置方式 添加智能体信息&#xff0c;data-bot为智…

STM32中I2C协议详解

前言 在嵌入式系统中&#xff0c;设备间的短距离通信协议中&#xff0c;I2C&#xff08;Inter-Integrated Circuit&#xff0c;集成电路互连&#xff09;以其信号线少、布线简单、支持多从机等特点&#xff0c;被广泛应用于传感器、EEPROM、OLED屏等中低速外设的通信场景。与SP…

解锁Spring Boot多项目共享Redis:优雅Key命名结构指南

引言Redis 基础与 Spring Boot 集成Redis 简介Redis&#xff0c;即 Remote Dictionary Server&#xff0c;是一个开源的基于内存的数据结构存储系统&#xff0c;可用作数据库、缓存和消息中间件 。它具备诸多显著特性&#xff0c;使其在现代软件开发中占据重要地位。Redis 的读…

《重构项目》基于Apollo架构设计的项目重构方案(多种地图、多阶段、多任务、状态机管理)

1. 项目结构设计project/ ├── config/ # 配置文件&#xff08;定义 Scenario、Stage、Task 的映射&#xff09; ├── src/ │ ├── base/ # 抽象基类定义 │ │ ├── scenario_base.h/.cpp │ │ ├── stage_base.h/.cpp…

动手学深度学习13.6. 目标检测数据集-笔记练习(PyTorch)

以下内容为结合李沐老师的课程和教材补充的学习笔记&#xff0c;以及对课后练习的一些思考&#xff0c;自留回顾&#xff0c;也供同学之人交流参考。 本节课程地址&#xff1a;数据集_哔哩哔哩_bilibili 本节教材地址&#xff1a;13.6. 目标检测数据集 — 动手学深度学习 2.0…

Unity3D游戏内存优化指南

前言 Unity3D 游戏的内存控制是保证游戏流畅运行&#xff08;尤其在移动端和主机平台&#xff09;和避免崩溃的关键挑战。以下是核心策略和常见问题的解决方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&#xff0c;希望大家可以点击进来一起交流一下开发经验…

git学习:首次创建仓库

文章目录前言&#xff1a;1、首次创建仓库并上传数据1.1 创建仓库&#xff0c;1.2 命令上传1.3 首次代码上传至仓库的步骤&#xff1a;2、分支操作2.1 分支的删除2.2 切换分支2.3 查看分支2.4 同步其他分支的修改3、查看电脑的配置文件4、远程仓库命令 git remote5、其他后语前…

C++并行计算:OpenMP与MPI全解析

在高性能计算领域&#xff0c;充分利用硬件资源的并行计算技术已成为刚需。从单节点多核到跨节点集群&#xff0c;开发者需要掌握不同的并行编程模型。本文将系统讲解两种主流并行技术&#xff1a;OpenMP&#xff08;共享内存多核并行&#xff09;与MPI&#xff08;分布式内存集…

TCP 动态选路协议全面研究:OSPF、BGP 与 IS-IS 的比较与应用分析

一、引言&#xff1a;动态选路协议概述 在现代计算机网络中&#xff0c;路由选择是数据传输的核心功能&#xff0c;它决定了数据包从源到目的地的路径选择。随着网络规模的不断扩大和复杂性的增加&#xff0c;静态路由已经无法满足网络动态变化的需求&#xff0c;动态路由协议…

OpenCV 图像哈希类cv::img_hash::AverageHash

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::img_hash::AverageHash是OpenCV中用于图像哈希&#xff08;Image Hashing&#xff09;的一个类&#xff0c;属于opencv_img_hash模块。它实现了…