1. 了解 Kafka

Apache Kafka 是一个分布式流处理平台,核心功能包括:

  • 发布/订阅消息系统:解耦生产者和消费者

  • 分布式存储:持久化、容错的消息存储

  • 流处理:实时处理数据流

核心概念

概念说明
BrokerKafka 集群中的单个服务器节点
Topic消息的逻辑分类(如:user_activity
PartitionTopic 的分区(并行处理单位),消息按顺序存储
Producer向 Topic 发布消息的客户端
Consumer订阅 Topic 并处理消息的客户端
Consumer Group多个消费者协同消费同一 Topic(每个分区只被组内一个消费者消费)
Offset消息在分区中的唯一位置标识

2. 了解 rdkafka

rdkafka 是 Kafka 的 C/C++ 客户端库,提供:

  • 高性能生产/消费 API(支持 C/C++/Python 等)

  • 特性:

    • 异步/同步发送模式

    • 自动负载均衡

    • 消息压缩(gzip, snappy, lz4)

    • SASL 认证

    • 精确一次语义(EOS)

  • 开源地址:edenhill/librdkafka

3. 代码实现

以下是使用 librdkafka 的 C++ 接口操作 Kafka 的生产者和消费者完整实现:

生产者代码 (producer.cpp)
#include <iostream>
#include <string>
#include <sstream>
#include <librdkafka/rdkafkacpp.h>class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:void dr_cb(RdKafka::Message &message) {if (message.err()) {std::cerr << "消息发送失败: " << message.errstr() << std::endl;} else {std::cout << "消息发送成功: " << message.topic_name() << " [" << message.partition() << "] @ " << message.offset() << std::endl;}}
};int main() {// 1. 创建配置对象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 设置配置参数if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 设置消息确认模式 (all = 最高可靠性)if (conf->set("acks", "all", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 3. 创建生产者实例ProducerDeliveryReportCb delivery_cb;if (conf->set("dr_cb", &delivery_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置回调错误: " << errstr << std::endl;return 1;}RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "创建生产者失败: " << errstr << std::endl;return 1;}delete conf;// 4. 创建Topic对象RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);RdKafka::Topic *topic = RdKafka::Topic::create(producer,"cpp_test_topic",tconf,errstr);if (!topic) {std::cerr << "创建Topic失败: " << errstr << std::endl;delete tconf;return 1;}delete tconf;// 5. 生产消息for (int i = 0; i < 10; ++i) {std::string key = "key-" + std::to_string(i);std::string payload = "Message #" + std::to_string(i);// 发送消息RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, // 自动分区分配RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(payload.c_str()), payload.size(),const_cast<char*>(key.c_str()), key.size(),NULL);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "生产消息失败: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "已发送: " << payload << std::endl;}// 处理事件队列producer->poll(0);}// 6. 等待所有消息完成发送while (producer->outq_len() > 0) {std::cout << "等待发送队列: " << producer->outq_len() << std::endl;producer->poll(100);}// 7. 清理资源delete topic;delete producer;return 0;
}
消费者代码 (consumer.cpp)
#include <iostream>
#include <string>
#include <csignal>
#include <vector>
#include <librdkafka/rdkafkacpp.h>bool running = true;void stop(int sig) {running = false;
}class ConsumerEventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "错误: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_LOG:std::cout << "日志: " << event.str() << std::endl;break;default:std::cout << "事件: " << event.type() << ": " << event.str() << std::endl;break;}}
};int main() {// 注册信号处理signal(SIGINT, stop);signal(SIGTERM, stop);// 1. 创建配置对象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 设置配置参数if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 设置消费组if (conf->set("group.id", "cpp_consumer_group", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 从最早的消息开始消费if (conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 3. 设置事件回调ConsumerEventCb event_cb;if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "设置回调失败: " << errstr << std::endl;return 1;}// 4. 创建消费者实例RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "创建消费者失败: " << errstr << std::endl;return 1;}delete conf;// 5. 订阅Topicstd::vector<std::string> topics;topics.push_back("cpp_test_topic");RdKafka::ErrorCode resp = consumer->subscribe(topics);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "订阅失败: " << RdKafka::err2str(resp) << std::endl;return 1;}// 6. 消费消息while (running) {// 等待消息 (1000ms超时)RdKafka::Message *msg = consumer->consume(1000);switch (msg->err()) {case RdKafka::ERR__TIMED_OUT:break;  // 超时继续case RdKafka::ERR_NO_ERROR:// 成功消费到消息std::cout << "收到消息: "<< "主题: " << msg->topic_name() << " | 分区: [" << msg->partition() << "]"<< " | 偏移量: " << msg->offset() << std::endl;if (msg->key()) {std::cout << "键: " << *msg->key() << " => ";}std::cout << "值: " << static_cast<const char*>(msg->payload()) << std::endl;break;default:std::cerr << "消费错误: " << msg->errstr() << std::endl;break;}// 手动提交偏移量consumer->commitAsync(msg);delete msg;}// 7. 关闭消费者consumer->close();delete consumer;return 0;
}
编译运行

# 编译生产者
g++ -o producer producer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl# 编译消费者
g++ -o consumer consumer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/88461.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/88461.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/88461.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE5多人MOBA+GAS 13、添加死亡、复活逻辑以及布娃娃含物理资产的修改调整

文章目录使用GE为角色添加定时的Tag控制死亡时间1、添加死亡Tag2、创建死亡GE&#xff0c;并完成相关配置3、在AbilitySystemComponent中监听属性的变化&#xff0c;调用GE来添加Tag到角色上4、在角色中监听ASC传入的Tag以及Tag的层数&#xff0c;来响应不同的函数添加死亡、复…

Jiasou TideFlow重塑AI SEO全链路自动化新标杆

引言 在Google日均处理85亿次搜索请求的数字化浪潮中&#xff0c;传统SEO工作流面临三大致命瓶颈&#xff1a;人工拓词效率低下、跨部门协作成本高企、数据监控链路断裂。因此诸如Jiasou AI SEO这样专门为AI SEO而生的Agent就应运而生了。 背景 Jiasou AIGC不仅仅可以批量生成…

CentOs 7 MySql8.0.23之前的版本主从复制

准备俩台虚拟机并启动俩台虚拟机都开启mysql后查看二进制日志是否开启先登录mysqlmysql -u root -r输入sql命令show variables like %log_bin%;如果log_bin 的value为OFF则是没有开启&#xff0c;跟着下面步骤开启二进制日志退出mysqlexitvim /etc/my.cnf在最底下添加log_binmy…

Leetcode 3607. Power Grid Maintenance

Leetcode 3607. Power Grid Maintenance 1. 解题思路2. 代码实现 题目链接&#xff1a;3607. Power Grid Maintenance 1. 解题思路 这一题思路上首先是一个DSU的思路&#xff0c;将所有的连通网络计算出来&#xff0c;并对每一个网络的节点进行归类。然后我们需要对每一个网…

开源 python 应用 开发(三)python语法介绍

最近有个项目需要做视觉自动化处理的工具&#xff0c;最后选用的软件为python&#xff0c;刚好这个机会进行系统学习。短时间学习&#xff0c;需要快速开发&#xff0c;所以记录要点步骤&#xff0c;防止忘记。 链接&#xff1a; 开源 python 应用 开发&#xff08;一&#xf…

1-Kafka介绍及常见应用场景

Kafka 介绍 Apache Kafka 是一个开源的 分布式流处理平台&#xff0c;最初由 LinkedIn 开发&#xff0c;后捐赠给 Apache 软件基金会。它被设计用于高吞吐量、低延迟、可水平扩展地处理实时数据流。官网地址是&#xff1a;https://kafka.apache.org/ 以下是 Kafka 的核心介绍…

CH9121T电路及配置详解

目录1. CH9121T简介2. 原理图及接口2.1 参考电路2.2 CH9121T评估板2.3 差分端口2.4 网口灯显示2.5 晶振2.6 其他接口3. 使用手册及说明3.1 配置介绍3.2 默认参数3.3 串口波特率3.4 配置指令3.5 应用示例1. CH9121T简介 CH9121 是一款网络串口透传芯片&#xff0c;自带 10/100M…

科研数据可视化核心技术:基于 AI 与 R 语言的热图、火山图及网络图绘制实践指南

在学术研究竞争日趋激烈的背景下&#xff0c;高质量的数据可视化已成为科研成果呈现与学术传播的关键要素。据统计&#xff0c;超过 60% 的学术稿件拒稿原因与图表质量存在直接关联&#xff0c;而传统绘图工具在处理组学数据、复杂关联数据时&#xff0c;普遍存在效率低下、规范…

Windows体验macOS完整指南

一、虚拟机安装macOS专业方案1. 环境准备阶段硬件检测&#xff1a;进入BIOS&#xff08;开机时按Del/F2键&#xff09;确认开启VT-x/AMD-V虚拟化选项建议配置&#xff1a;i5十代以上CPU/16GB内存/256GB SSD软件准备&#xff1a;官网下载VMware Workstation 17 Pro获取Unlocker补…

【普及/提高−】洛谷P1577 ——切绳子

见&#xff1a;P1577 切绳子 - 洛谷 题目描述 有 N 条绳子&#xff0c;它们的长度分别为 Li​。如果从它们中切割出 K 条长度相同的绳子&#xff0c;这 K 条绳子每条最长能有多长&#xff1f;答案保留到小数点后 2 位(直接舍掉 2 位后的小数)。 输入格式 第一行两个整数 N …

imx6ull-裸机学习实验16——I2C 实验

目录 前言 I2C简介 基本特性​​ I2C 协议 起始位 停止位 数据传输 应答信号 I2C 写时序 I2C 读时序 I.MX6U I2C 简介 寄存器 地址寄存器I2Cx_IADR(x1~4) 分频寄存器I2Cx_IFDR 控制寄存器I2Cx_I2CR 状态寄存器I2Cx_I2SR 数据寄存器I2Cx_I2DR AP3216C 简介 …

【TCP/IP】5. IP 协议

5. IP 协议5. IP 协议5.1 概述5.2 IP 数据报格式5.3 无连接数据报传输5.3.1 首部校验5.3.2 数据分片与重组5.4 IP 数据报选项5.4.1 选项格式5.4.2 选项类型5.5 IP 模块的结构本章要点5. IP 协议 5.1 概述 IP 协议是 TCP/IP 协议簇的核心协议&#xff0c;位于网络层&#xff0…

Linux 服务器挖矿病毒深度处理与防护指南

在 Linux 服务器运维中&#xff0c;挖矿病毒是常见且危害较大的安全威胁。此类病毒通常会隐蔽占用大量 CPU 资源进行加密货币挖矿&#xff0c;导致服务器性能骤降、能耗激增&#xff0c;甚至被黑客远程控制。本文将从病毒特征识别、应急处理流程、深度防护措施三个维度&#xf…

MySQL数据表设计 系统的营销功能 优惠券、客户使用优惠券的设计

系统的营销功能营销功能概述&#xff1a;系统的营销功能主要是&#xff1a;市场活动管理、营销自动化、销售线索管理以及数据分析和报告等。‌ToC‌&#xff08;Consumer&#xff09;&#xff1a;面向个人消费者&#xff0c;满足日常消费需求。‌优惠券的种类&#xff1a;ToC的…

让 3 个线程串行的几种方式

1、通过join()的方式 子线程调用join()的时候&#xff0c;主线程等待子线程执行完再执行。如果让多个线程顺序执行的话&#xff0c;那么需要他们按顺序调用start()。/*** - 第一个迭代&#xff08;i0&#xff09;&#xff1a;* 启动线程t1 -> 然后调用t1.join()。* …

在 Vue 项目中关闭 ESLint 规则

在 Vue 2 项目中关闭 ESLint 规则有以下几种方法&#xff0c;根据您的需求选择合适的方式&#xff1a; 1. 完全禁用 ESLint 修改 vue.config.js&#xff08;推荐&#xff09; module.exports {// 关闭 ESLintlintOnSave: false }或修改 package.json {"scripts": {&…

电脑息屏工具,一键黑屏超方便

软件介绍 今天为大家推荐一款实用的PC端屏幕管理工具——CloseDsp。这款"息屏小能手"能一键关闭显示器&#xff0c;解决各种场景下的屏幕管理需求。 核心功能 CloseDsp最突出的特点是能瞬间关闭显示器屏幕。只需点击"关闭显示器"按钮&#xff0c;屏幕…

嵌入式调试LOG日志输出(以STM32为例)

引言在嵌入式系统开发中&#xff0c;调试是贯穿整个生命周期的关键环节。与传统PC端程序不同&#xff0c;嵌入式设备资源受限&#xff08;如内存、存储、处理器性能&#xff09;&#xff0c;且运行环境复杂&#xff08;无显示器、键盘&#xff09;&#xff0c;传统的断点调试或…

Zephyr的设备驱动模型

默认配置默认配置 boards/arm/nucleo_f401re/ ├── nucleo_f401re.dts ← 板卡设备树主入口 ├── nucleo_f401re_defconfig ← 默认 Kconfig 配置 ├── board.cmake ← CMake 构建入口overlay1.新增加驱动需要修改对应板的设备树文件&#xf…

Mysql字段没有索引,通过where x = 3 for update是使用什么级别的锁

没有索引时&#xff0c;FOR UPDATE 会锁住整个表 现在&#xff0c;你正在一本一本地翻看所有书&#xff0c;寻找“维修中”的书&#xff0c;并且你对管理员说&#xff1a;“在我清点和修改完之前&#xff0c;别人不能动这些书&#xff0c;也不能往这个范围里加新书&#xff01;…