#作者:程宏斌

文章目录

    • disconnect
    • reconnect

接上篇:https://blog.csdn.net/qq_40477248/article/details/150957571?spm=1001.2014.3001.5501

disconnect

断开连接的情况主要是两种:
连接或传输过程中有错误发生
超时, 比如空闲时间超时

**
* Close and destroy a transport handle
*/
void rd_kafka_transport_close(rd_kafka_transport_t *rktrans) {
...// 清除接收缓冲区if (rktrans->rktrans_recv_buf)rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
...if (rktrans->rktrans_s != -1) // 自定义close或者socket.close()rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,rktrans->rktrans_s);
​rd_free(rktrans);
}
/**
* @brief Failure propagation to application.
*
* Will tear down connection to broker and trigger a reconnect.
*
* \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
* be debug-logged.
*
* @locality broker thread
*/
void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,int level,rd_kafka_resp_err_t err,const char *fmt,...) {
...if (rkb->rkb_transport) {// close socketrd_kafka_transport_close(rkb->rkb_transport);rkb->rkb_transport = NULL;
...// 设置状态rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
}
​
/**
* @brief Check if connections.max.idle.ms has been exceeded and if so
*        close the connection.
* 空闲时间探查
* @remark Must only be called if connections.max.idle.ms > 0 and
*         the current broker state is UP (or UPDATE).
*
* @locality broker thread
*/
static RD_INLINE void rd_kafka_broker_idle_check(rd_kafka_broker_t *rkb) {
…// 连接空闲时间 是否超过 服务端最大空闲时间, 默认10分钟if (likely(idle_ms < rkb->rkb_rk->rk_conf.connections_max_idle_ms))return;// 超过, 服务端会断开连接; client保险起见, 强制关闭连接rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__TRANSPORT,"Connection max idle time exceeded ""(%dms since last activity)",idle_ms);

reconnect

连接失败时, 系统自动发起重连. 重连不会终止, 直到连接成功或者系统退出.
nodename更改时, 会尝试断开重连

/**
* @brief Update the reconnect backoff.
*        Should be called when a connection is made, or all addresses
*        a broker resolves to has been exhausted without successful connect.
* 设置更新重试时间
* @locality broker thread
* @locks none
*/
static void
rd_kafka_broker_update_reconnect_backoff(rd_kafka_broker_t *rkb,const rd_kafka_conf_t *conf,rd_ts_t now) {
…/* 重试时间(间隔)已超过最大限制时间reconnect.backoff.max.ms* 重置下次的重试时间. */if (rkb->rkb_ts_reconnect + (conf->reconnect_backoff_max_ms * 1000) < now)rkb->rkb_reconnect_backoff_ms = conf->reconnect_backoff_ms;
​/* 在区间[-25%, +50%]内随机取一个重试时间*/backoff = rd_jitter((int)((float)rkb->rkb_reconnect_backoff_ms * 0.75),(int)((float)rkb->rkb_reconnect_backoff_ms * 1.5));
​/* 不能超过reconnect.backoff.max.ms. */backoff = RD_MIN(backoff, conf->reconnect_backoff_max_ms);
​/* Set time of next reconnect */rkb->rkb_ts_reconnect         = now + (backoff * 1000);rkb->rkb_reconnect_backoff_ms = RD_MIN(rkb->rkb_reconnect_backoff_ms * 2, conf->reconnect_backoff_max_ms);
}
​
/**
* @brief Calculate time until next reconnect attempt.
*
* @returns the number of milliseconds to the next connection attempt, or 0
*          if immediate.
* @locality broker thread
* @locks none
*/
// 计算距离下次重试的时间间隔
static RD_INLINE int
rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t *rkb, rd_ts_t now) {
…remains = rkb->rkb_ts_reconnect - now;
…
}
​
static int rd_kafka_broker_thread_main(void *arg) {
...switch (rkb->rkb_state) {
...case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
.../* Throttle & jitter reconnects to avoid* thundering horde of reconnecting clients after* a broker / network outage. Issue #403 */backoff =rd_kafka_broker_reconnect_backoff(rkb, rd_clock());if (backoff > 0) {rd_rkb_dbg(rkb, BROKER, "RECONNECT","Delaying next reconnect by %dms",backoff);rd_kafka_broker_serve(rkb, (int)backoff);continue;}
...case RD_KAFKA_BROKER_STATE_CONNECT:case RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE:case RD_KAFKA_BROKER_STATE_AUTH_LEGACY:case RD_KAFKA_BROKER_STATE_AUTH_REQ:case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:/* Asynchronous connect in progress. */rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
​/* Connect failure.* Try the next resolve result until we've* tried them all, in which case we back off the next* connection attempt to avoid busy looping. */if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN &&rd_kafka_broker_addresses_exhausted(rkb))rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf, rd_clock());/* If we haven't made progress from the last state, and* if we have exceeded* socket_connection_setup_timeout_ms, then error out.* Don't error out in case this is a reauth, for which* socket_connection_setup_timeout_ms is not* applicable. */else if (rkb->rkb_state == orig_state &&!rkb->rkb_reauth_in_progress &&rd_clock() >=(rkb->rkb_ts_connect +(rd_ts_t)rk->rk_conf.socket_connection_setup_timeout_ms *1000))rd_kafka_broker_fail(rkb, LOG_WARNING,RD_KAFKA_RESP_ERR__TRANSPORT,"Connection setup timed out in state %s",rd_kafka_broker_state_names[rkb->rkb_state]);
​break;
…
}
​
/**
* @brief Update the nodename (address) of broker \p rkb
*        with the nodename from broker \p from_rkb (may be NULL).
*
*        If \p rkb is connected, the connection will be torn down.
*        A new connection may be attempted to the new address
*        if a persistent connection is needed (standard connection rules).
*
*        The broker's logname is also updated to include \p from_rkb's
*        broker id.
*
* @param from_rkb Use the nodename from this broker. If NULL, clear
*                 the \p rkb nodename.
*
* @remark Must only be called for logical brokers.
*
* @locks none
*/
void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb,rd_kafka_broker_t *from_rkb) {
…// nodename已更改过, 需要触发断线和重连/* Trigger a disconnect & reconnect */rd_kafka_broker_schedule_connection(rkb);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97609.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97609.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/97609.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React 第七十一节 Router中generatePath的使用详解及注意事项

前言 generatePath 是 React Router 的一个实用工具函数&#xff0c;用于根据路径模式和参数对象生成实际的 URL 路径。它在需要动态构建链接的场景中非常有用&#xff0c;比如生成导航链接或重定向路径。 1、基本用法和注意事项 import { generatePath } from react-router-do…

Python 爬虫案例:爬取豆瓣电影 Top250 数据

一、案例背景与目标 豆瓣电影 Top250 是国内权威的电影评分榜单之一&#xff0c;包含电影名称、评分、评价人数、导演、主演、上映年份、国家 / 地区、类型等关键信息。本案例将使用 Python 编写爬虫&#xff0c;实现以下目标&#xff1a; 自动请求豆瓣电影 Top250 的 10 个分…

SPA安全警示:OAuth2.0致命漏洞

OAuth2.0在SPA应用中的安全陷阱SPA&#xff08;单页应用&#xff09;通常采用隐式授权&#xff08;Implicit Flow&#xff09;或PKCE&#xff08;Proof Key for Code Exchange&#xff09;授权模式&#xff0c;但存在以下安全隐患&#xff1a;隐式授权模式的漏洞访问令牌直接暴…

table表格字段明细展示

文章目录1、字段渲染2、异步请求展示明细3、hover展示问题3.1 基本逻辑3.2 hover时长判断3.3 renderhover表格字段明细展示&#xff0c;属于比较小的需求&#xff0c;但是也有一定交互细节&#xff0c;本文选取部分场景。 1、字段渲染 render和渲染组件是有区别的。render常见为…

主网上线后生态极速扩张的 Berachain 生态,有哪些值得关注的项目?

Berachain 是典型的将 DeFi 思维嵌入到共识机制中的 Layer1&#xff0c;其核心是 PoL&#xff08;Proof of Liquidity&#xff09;共识。PoL 要求验证者在获得区块奖励前&#xff0c;必须将流动性导入白名单协议&#xff0c;并由市场决定资金流向。这样&#xff0c;验证者的权重…

claude-code对比GitHub-Copilot

Claude Code 文档日期&#xff1a;2025 年 08 月 20 日 定位 项目级开发助手&#xff0c;专注于全局视野和复杂任务的处理。 特点 超长上下文支持&#xff1a;支持 200k 超长上下文&#xff0c;适合处理复杂项目。丰富的自定义命令&#xff1a;提供灵活的命令配置&#xff0c;满…

Roo Code自定义Mode(模式)

什么是自定义模式&#xff1f; 简单来说&#xff0c;自定义模式就像是给Roo Code穿上不同的"职业装"。你可以创建针对特定任务或工作流程量身定制的模式&#xff0c;让Roo在不同场景下表现出专业的行为。 这些模式分为两种类型&#xff1a;全局模式&#xff08;在所有…

Next.js渲染模式:SSR、SSG与ISR揭秘

Next.js 核心渲染模式深度解析&#xff1a;SSR、SSG 与 ISR 在构建现代 Web 应用时&#xff0c;性能和用户体验是至关重要的考量。Next.js 作为 React 生态中一个备受推崇的框架&#xff0c;其强大的服务端渲染&#xff08;SSR&#xff09;、静态站点生成&#xff08;SSG&#…

Veo Videos Generation API 对接说明

本文介绍了如何对接 Veo Videos Generation API&#xff0c;通过输入自定义参数生成Veo官方视频。 下面将详细阐述 Veo Videos Generation API 的对接流程。 申请流程 使用 API 前&#xff0c;需前往 Veo Videos Generation API 页面申请服务。进入页面后&#xff0c;点击「…

YOLO 目标检测:YOLOv3网络结构、特征输出、FPN、多尺度预测

文章目录一、YOLOV31、网络结构1.1 整体结构1.2 主干网络1.3 特征输出1.4 特征融合FPN&#xff08;Feature Pyramid Networks&#xff09;FPN 融合上采样融合2、多尺度预测3、损失函数4、性能对比一、YOLOV3 YOLOv3&#xff08;You Only Look Once v3&#xff09;是YOLO系列中…

【GIS图像处理】有哪些SOTA方法可以用于将1.5米分辨率遥感图像超分辨率至0.8米精度的?

针对将1.5米分辨率遥感图像超分辨率至0.8米的需求,当前主流方法可分为以下几类,结合最新研究进展和实际应用场景,具体技术方案及SOTA方法如下: 一、基于Transformer的高效建模 1. Top-k标记选择Transformer(TTST) 核心机制:通过动态选择前k个关键标记(token),消除冗…

【电力电子】逆变器控制策略:PQ Droop下垂控制、电压电流双环控制与SPWM调制

逆变器中的 PQ Droop 控制。 1. PQ Droop 控制的定义 PQ Droop(有时也称为功率下垂控制,Power Droop Control)是微电网、并联系统或逆变器并网运行中常用的一种分布式功率控制方法。 P-Droop(有功下垂):通过调节逆变器输出频率与有功功率之间的关系实现功率分配。 Q-Dro…

【LeetCode 热题 100】5. 最长回文子串——中心扩散法

Problem: 5. 最长回文子串 文章目录整体思路完整代码时空复杂度时间复杂度&#xff1a;O(N^2)空间复杂度&#xff1a;O(1)整体思路 这段代码旨在解决经典的 “最长回文子串” (Longest Palindromic Substring) 问题。问题要求在一个给定的字符串 S 中&#xff0c;找到一个最长…

六、练习3:Gitee平台操作

练习3&#xff1a;Gitee平台操作 练习目标 掌握Gitee平台的基本操作&#xff0c;包括创建仓库、推送代码、团队协作等。 练习步骤 步骤1&#xff1a;Gitee账号准备 访问 gitee.com注册账号&#xff08;如果还没有&#xff09;登录Gitee 步骤2&#xff1a;配置SSH密钥 # …

Git软件版本控制

软件版本控制作用&#xff1a;软件源码版本管理、多人协作开发、版本多分支开发、代码回滚&#xff08;回退&#xff09;等功能。集中式版本控制&#xff1a;将代码仓库放在一台服务器上&#xff0c;开发时要依赖这台服务器。优点&#xff1a;简单、方便管理、适合中小型项目缺…

生产环境Spark Structured Streaming实时数据处理应用实践分享

生产环境Spark Structured Streaming实时数据处理应用实践分享 一、业务场景描述 我们所在的电商平台需要实时监控用户行为数据&#xff08;如点击、下单、支付等&#xff09;&#xff0c;基于事件级别的流式数据进行实时统计、会话聚合、漏斗分析&#xff0c;并将结果推送到Da…

海康相机开发---HCNetSDK

HCNetSDK&#xff08;Hikvision Network Software Development Kit&#xff09;是海康威视专为旗下安防监控设备打造的二次开发工具包&#xff0c;是连接上层应用与海康设备的核心桥梁。其封装了设备底层通信协议&#xff08;包括私有协议与部分标准协议&#xff09;&#xff0…

构建无广告私人图书馆Reader与cpolar让电子书库随身携带

文章目录前言&#xff1a;告别书荒&#xff0c;拯救灵魂的“摸鱼神器”1、关于Reader&#xff1a;小而美的开源在线阅读器2、Docker部署3、简单使用reader和添加书源4.群晖安装Cpolar工具5.创建reader阅读器的公网地址6.配置固定公网地址前言&#xff1a;告别书荒&#xff0c;拯…

amd cpu是x86架构吗

是的&#xff0c;AMD CPU属于x86架构‌&#xff0c;其64位扩展&#xff08;x86-64&#xff09;最初由AMD设计并成为行业标准。‌ ‌AMD与x86架构的关系‌ ‌技术渊源‌&#xff1a;AMD自1976年起通过技术授权成为x86架构的合法制造商&#xff0c;与英特尔共同主导x86市场。2003…

vercel上线资源无法加载

背景&#xff1a;在本地跑开发服务器没问题&#xff0c;但是部署到 vercel 上就有问题上一次出现类似问题是在更新游戏引擎方法后本地可以跑但是上线没有成功&#xff0c;当时是因为 runner.html 是在部署时通过脚本从远端仓库拉取的&#xff0c;所以解决方案&#xff1a;1.更新…