目录

1. Lars-reportV0.1 report模块介绍

2.Lars-reporterV0.1 reporter项目目录构建

3.Lars-ReporterV0.1 数据表和proto协议环境搭建

4.Lars-ReporterV0.1上报请求业务处理

5.Lars-ReporterV0.1上报请求模块的测试

6.Lars-ReporterV0.2开辟存储线程池-网络存储分离


1. Lars-reportV0.1 report模块介绍

 5) 存储线程池及消息队列

​        我们现在的reporter_service的io入库操作,完全是在消息的callback中进行的,那么实际上,这回占用我们server的工作线程的阻塞时间,从而浪费cpu。所以我们应该将io的入库操作,交给一个专门做入库的消息队列线程池来做,这样我们的callback就会立刻返回该业务,从而可以继续处理下一个conn链接的消息事件业务。

​        所以我们就要在此给reporter_service设计一个存储数据的线程池及配套的消息队列。当然这里面我们还是直接用写好的`lars_reactor`框架里的接口即可。

> lars_reporter/src/reporter_service.cpp

```c
#include "lars_reactor.h"
#include "lars.pb.h"
#include "store_report.h"
#include <string>

thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
int thread_cnt = 0;

void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    lars::ReportStatusRequest req;

    req.ParseFromArray(data, len);

    //将上报数据存储到db 
    StoreReport sr;
    sr.store(req);

    //轮询将消息平均发送到每个线程的消息队列中
    static int index = 0;
    //将消息发送给某个线程消息队列
    reportQueues[index]->send(req);
    index ++;
    index = index % thread_cnt;
}

void create_reportdb_threads()
{
    thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
    
    //开线程池的消息队列
    reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];

    if (reportQueues == NULL) {
        fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
        exit(1);
    }

    for (int i = 0; i < thread_cnt; i++) {
        //给当前线程创建一个消息队列queue
        reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
        if (reportQueues == NULL) {
            fprintf(stderr, "create thread_queue error\n");
            exit(1);
        }

        pthread_t tid;
        int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
        if (ret == -1)  {
            perror("pthread_create");
            exit(1);
        }

        pthread_detach(tid);
    }
}

2.Lars-reporterV0.1 reporter项目目录构建

int main(int argc, char **argv)
{
    event_loop loop;

    //加载配置文件
    config_file::setPath("./conf/lars_reporter.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 7779);


    //创建tcp server
    tcp_server server(&loop, ip.c_str(), port);

    //添加数据上报请求处理的消息分发处理业务
    server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);

    //为了防止在业务中出现io阻塞,那么需要启动一个线程池对IO进行操作的,接受业务的请求存储消息
    create_reportdb_threads();
  
    //启动事件监听
    loop.event_process(); 

    return 0;
}
```

​        这里主线程启动了线程池,根据配置文件的`db_thread_cnt`数量来开辟。每个线程都会执行`store_main`方法,我们来看一下实现

> lars_reporter/src/store_thread.cpp

```c
#include "lars.pb.h"
#include "lars_reactor.h"
#include "store_report.h"

struct Args 
{
    thread_queue<lars::ReportStatusRequest>* first;
    StoreReport *second;
};

//typedef void io_callback(event_loop *loop, int fd, void *args);
void thread_report(event_loop *loop, int fd, void *args)
{
    //1. 从queue里面取出需要report的数据(需要thread_queue)
    thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
    StoreReport *sr = ((Args*)args)->second;

    std::queue<lars::ReportStatusRequest> report_msgs;

    //1.1 从消息队列中取出全部的消息元素集合
    queue->recv(report_msgs);
    while ( !report_msgs.empty() ) {
        lars::ReportStatusRequest msg = report_msgs.front();
        report_msgs.pop();

        //2. 将数据存储到DB中(需要StoreReport)
        sr->store(msg);
    }
}

3.Lars-ReporterV0.1 数据表和proto协议环境搭建

void *store_main(void *args)
{
    //得到对应的thread_queue
    thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;

    //定义事件触发机制
    event_loop loop;

    //定义一个存储对象
    StoreReport sr; 

    Args callback_args;
    callback_args.first = queue;
    callback_args.second = &sr;

    queue->set_loop(&loop);
    queue->set_callback(thread_report, &callback_args);


    //启动事件监听
    loop.event_process();

    return NULL;
}
```

​        每个线程都会绑定一个`thread_queue<lars::ReportStatusRequest>`,然后一个线程里面有一个loop,来监控消息队列是否有消息事件过来,如果有消息实现过来,针对每个消息会触发`thread_report()`方法, 在`thread_report()`中,我们就直接将`lars::ReportStatusRequest`消息存储到db中。

​        那么,由谁来给每个线程的`thread_queue`发送消息呢,就是agent/客户端发送的请求,我们在处理`lars::ID_ReportStatusRequest` 消息分发业务的时候调用`get_report_status()`来触发。

> lars_reporter/src/reporter_service.cpp

4.Lars-ReporterV0.1上报请求业务处理

```c
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    lars::ReportStatusRequest req;

    req.ParseFromArray(data, len);

    //将上报数据存储到db 
    StoreReport sr;
    sr.store(req);

    //轮询将消息平均发送到每个线程的消息队列中
    static int index = 0;
    //将消息发送给某个线程消息队列
    reportQueues[index]->send(req);
    index ++;
    index = index % thread_cnt;
}
```

​        这里的分发机制,是采用最轮询的方式,是每个线程依次分配,去调用`thread_queue`的`send()`方法,将消息发送给消息队列。



​        最后我们进行测试,效果跟之前的效果是一样的。我们现在已经集成进来了存储线程池,现在就不用担心在处理业务的时候,因为DB等的io阻塞,使cpu得不到充分利用了。





### 

5.Lars-ReporterV0.1上报请求模块的测试

# 六、Lars-Load Balance Agent负载代理



## 1) 简介



​        一个服务称为一个模块,一个模块由modid+cmdid来标识
modid+cmdid的组合表示一个远程服务,这个远程服务一般部署在多个节点上

LB Agent以UDP方式为业务方提供:1、节点获取服务;2、节点调用结果上报服务

### 1.1 业务1-节点获取服务:

​        业务方每次要向远程服务发送消息时,先利用modid+cmdid去向LB Agent获取一个可用节点,然后向该节点发送消息,完成一次远程调用;具体获取modid+cmdid下的哪个节点是由LB Agent负责的



### 1.2 业务2-节点调用结果上报服务

​        对LB Agent节点的一次远程调用后,调用结果会汇报给LB Agent,以便LB Agent根据自身的LB算法来感知远程服务节点的状态是空闲还是过载,进而控制节点获取时的节点调度.

![4-Lars-agent](./pictures/4-Lars-agent.png)

6.Lars-ReporterV0.2开辟存储线程池-网络存储分离

LB Agent拥有5个线程,一个LB算法:

- UDP Server服务,并运行LB算法,对业务提供节点获取和节点调用结果上报服务;为了增大系统吞吐量,使用3个UDP Server服务互相独立运行LB算法:`modid+cmdid % 3 = i`的那些模块的服务与调度,由第`i+1`个UDP Server线程负责
- Dns Service Client:是dnsserver的客户端线程,负责根据需要,向dnsserver获取一个模块的节点集合(或称为获取路由);UDP Server会按需向此线程的MQ写入获取路由请求,DSS Client将MQ到来的请求转发到dnsserver,之后将dnsserver返回的路由信息更新到对应的UDP Server线程维护的路由信息中
- Report Service Client:是reporter的客户端线程,负责将每个模块下所有节点在一段时间内的调用结果、过载情况上报到reporter Service端,便于观察情况、做报警;本身消费MQ数据,UDP Server会按需向MQ写入上报状态请求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/78518.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/78518.shtml
英文地址,请注明出处:http://en.pswp.cn/web/78518.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW三轴电机控制

在工业自动化迅猛发展的当下&#xff0c;多轴伺服电机控制系统在制造业、3D 打印等众多领域的需求与日俱增。它不仅要实现高精度的单轴运动控制&#xff0c;还需保障多轴协同作业的精准度&#xff0c;对响应速度也有严格要求。LabVIEW 开发多轴伺服电机控制系统&#xff0c;有效…

驱动开发硬核特训 · Day 27(下篇):深入掌握 Common Clock Framework 架构与实战开发

节。 在本篇内容中&#xff0c;我们将围绕 Linux 内核中的时钟子系统核心架构 —— Common Clock Framework&#xff08;简称 CCF&#xff09;展开深入讲解&#xff0c;目标是帮助你全面理解其设计理念、主要数据结构、注册流程、驱动实现方式&#xff0c;以及如何基于 NXP i.M…

数据库基础:数据库类型与MySQL特点详解

一、数据库的主要类型 1. 关系型数据库(RDBMS) 特点:基于关系模型,数据以表格形式存储 代表产品:MySQL、Oracle、SQL Server、PostgreSQL 优势:ACID事务支持、强一致性、成熟的SQL标准 适用场景:需要复杂查询和事务支持的场景 2. 非关系型数据库(NoSQL) 文档型数据库:Mo…

49认知干货:产品的生命周期及类型汇总

49章:产品的生命周期与类型划分 宇宙是运动的而非静止的,任何事物亦是如此。只要是存在的事物,便必然存在周期性变化,就像四季更替中的冬日枯树、春日新芽、夏日繁茂与秋日凋零。 这也意味着:事物的发展,离不开周期的更迭与演化,死亡并非终点,而是一种新的循环转变。 …

【2025最新】为什么用ElasticSearch?和传统数据库MySQL与什么区别?

Elasticsearch 深度解析&#xff1a;从原理到实践 一、为什么选择 Elasticsearch&#xff1f; 数据模型 Elasticsearch 是基于文档的搜索引擎&#xff0c;它使用 JSON 文档来存储数据。在 Elasticsearch 中&#xff0c;相关的数据通常存储在同一个文档中&#xff0c;而不是分散…

Docker安装Gitblit(图文教程)

本章教程,使用Docker安装部署Gitblit。 一、Gitblit简介 Gitblit 是一个基于 Java 的 Git 仓库管理工具,主要用于在局域网或小型团队环境中搭建私有 Git 服务器。它提供了一个简单易用的 Web 界面,用于浏览代码、管理仓库和用户权限等。 二、拉取镜像 sudo docker pull git…

nDCG(归一化折损累计增益) 是衡量排序质量的指标,常用于搜索引擎或推荐系统

nDCG&#xff08;归一化折损累计增益&#xff09; 是衡量排序质量的指标&#xff0c;常用于搜索引擎或推荐系统。核心思想是&#xff1a;排名越靠前的高质量结果&#xff0c;对整体评分的贡献越大&#xff0c;但后续结果的贡献会逐渐“打折”。最终通过对比实际排序与理想排序的…

《从线性到二维:CSS Grid与Flex的布局范式革命与差异解析》

在前端开发的广袤宇宙中&#xff0c;CSS布局技术宛如闪耀的星辰&#xff0c;不断革新与演进&#xff0c;为构建绚丽多彩的网页世界提供了坚实的支撑。其中&#xff0c;CSS Grid布局与Flex布局作为两颗璀璨的明星&#xff0c;以其独特的魅力和强大的功能&#xff0c;深受开发者们…

2025年01月03日美蜥(杭州普瑞兼职)一面

目录 vue2 vue3 的区别react 性能优化react 组件传值v-for 和 v-if 的优先级react 中多个接口请求的数据&#xff0c;需要渲染到一个列表上怎么处理百万条数据怎么渲染vue2、vue3 的响应式原理微前端了解吗git 版本控制git mearge 和 git rebase 的区别垂直水平居中react 中实…

【聚类分析】基于copula的风光联合场景生成与缩减

目录 1 主要内容 风光出力场景生成方法 2 部分程序 3 程序结果 4 下载链接 1 主要内容 该程序方法复现《融合风光出力场景生成的多能互补微网系统优化配置》风光出力场景生成部分&#xff0c;目前大多数研究的是不计风光出力之间的相关性影响&#xff0c;但是地理位置相近…

LeetCode 1128 等价多米诺骨牌对的数量 题解

今天的每日一题&#xff0c;我的思路还是硬做&#xff0c;不如评论区通过状压写的简单&#xff0c;但是答题思路加算法实现是没有问题的&#xff0c;且时间复杂度也是可以通过的&#xff0c;毕竟全是o(n) 那么我就来说一下我的思路&#xff0c;根据dominoes[i] [a, b] 与 domi…

技术部测试规范

简短测试流程&#xff1a; 开发完成 -> 本地自测 -> 测试环境自测 -> 通知测试同事复测 -> 确认无误后上生产 -> 生产环境自测 -> 再次通知测试同事复测 -> 提交产品验收。 当然可以&#xff01;以下是进一步优化后的测试流程规范&#xff0c;特别强调了开…

算法每日一题 | 入门-顺序结构-大象喝水

大象喝水 题目描述 一只大象口渴了&#xff0c;要喝 20 升水才能解渴&#xff0c;但现在只有一个深 h 厘米&#xff0c;底面半径为 r 厘米的小圆桶 &#xff08;h 和 r 都是整数&#xff09;。问大象至少要喝多少桶水才会解渴。 这里我们近似地取圆周率 π 3.14 \pi3.14 π…

Qt中实现工厂模式

在Qt中实现工厂模式可以通过多种方式&#xff0c;具体选择取决于需求和场景。以下是几种常见的实现方法&#xff1a; 1. 简单工厂模式通过一个工厂类根据参数创建不同对象。cppclass Shape {public: virtual void draw() 0; virtual ~Shape() default;};class Circle : publ…

【前端】ES6一本通_划重点_补充面试题

近两天更新完基本内容&#xff0c;后续长期更新&#xff0c;建议关注收藏点赞。 ES6&#xff08;ECMAScript 2015&#xff09;是现代 JavaScript 的基础&#xff0c;在前端面试中非常常见。 本文已汇总的本站笔记 ES6最重要10特性 对象新增 数组新增 异步、生成器 Promise 模块…

初识 iOS 开发中的证书固定

引言 在移动应用安全领域&#xff0c;HTTPS/TLS 是数据传输的第一道防线&#xff0c;但仅依赖系统默认的证书验证仍有被中间人&#xff08;MITM&#xff09;攻击的风险。Certificate Pinning&#xff08;证书固定&#xff09;通过将客户端信任“钉”在指定的服务器证书或公钥上…

单片机的各个种类及其详细介绍

一、按架构分类的深度解析 1. ARM Cortex-M系列 核心优势&#xff1a; 统一架构&#xff1a;ARM生态完善&#xff0c;工具链&#xff08;Keil、IAR、GCC&#xff09;通用。 性能分层&#xff1a;M0&#xff08;低功耗&#xff09;、M3&#xff08;平衡&#xff09;、M4/M7&am…

5.7/Q1,GBD数据库最新文章解读

文章题目&#xff1a;Global, regional, and national burden and trends of rheumatoid arthritis among the elderly population: an analysis based on the 2021 Global Burden of Disease study DOI&#xff1a;10.3389/fimmu.2025.1547763 中文标题&#xff1a;全球、区域…

从微服务到AI服务:Nacos 3.0如何重构下一代动态治理体系?

在现代微服务架构的浪潮中&#xff0c;Nacos早已成为开发者手中的“瑞士军刀”。作为阿里巴巴开源的核心中间件&#xff0c;它通过动态服务发现、统一配置管理和服务治理能力&#xff0c;为云原生应用提供了坚实的基石。从初创公司到全球500强企业&#xff0c;Nacos凭借其开箱即…

Unity与Unreal Engine(UE)的深度解析及高级用法

以下是Unity与Unreal Engine(UE)的深度解析及高级用法对比,结合技术特性、行业应用与未来发展进行综合阐述: 一、核心差异与适用场景对比 1. 技术架构与编程模式 Unity 语言与脚本:主要使用C#,语法简洁且易于学习,适合快速原型开发和中小型项目。支持可视化脚本工具(如…