目录

一.消息队列背景

二.需求分析

核心概念:

BrokerServer:

BrokerServer的核心API:

交换机Exchange:

持久化:

网络通信:

消息应答:

三、模块划分

四、创建项目

五、创建核心类

Exchange:

MSGQueue:

Binding:

Message:

六.数据库设计

七.实现DataBaseManager类:

DataBaseManager类:

编写DataBaseManager类的测试用例:

八.消息存储设置

九.创建MessageFileManager类

创建MessageFileManager类:

实现该类的功能:

1.创建数据文件:统计文件的目录,文件

2.实现读写统计文件功能:

3.实现读写数据文件功能:

4.实现读写消息方法:

5.加载文件中的所有有效消息;

6.实现垃圾清除功能:GC:

编写MessageFileManager测试用例:

十.整合数据库和文件数据


一.消息队列背景

我们学习过阻塞队列(BlockingQueue), 阻塞队列最⼤的⽤途,就是⽤来实现 ⽣产者消费 者模型. ⽣产者消费者模型,存在诸多好处,是后端开发的常⽤编程⽅式.

• 解耦合

• 削峰填⾕

在实际的后端开发中,尤其是分布式系统⾥,跨主机之间使⽤⽣产者消费者模型,也是⾮常普遍的需求.因此,我们通常会把阻塞队列,封装成⼀个独⽴的服务器程序,并且赋予其更丰富的功能.这样的程序我们就称为消息队列(Message Queue,MQ).

市面上成熟的消息队列有很多:kafka,RabbitMQ,RocketMQ.....

接下来我们就以RabbitMQ为模版,实现他的核心功能.

二.需求分析

核心概念:

生产者Producter,消费者Consumer,中间人Broker,发布publisher,订阅Subscribe.

⼀个⽣产者,⼀个消费者

多个⽣产者,多个消费者

生产者和消费者存在1对1和多对多等多种关系模式.

生产者发布消息,到达Broker,Broker将消息发送给订阅该消息的消费者.Broker是消息队列的核心,负责存储和转发消息.

BrokerServer:

在broker内部,又存在一下概念:

虚拟机VirtualHost,交换机Exchange,队列Queue,绑定关系Binding

虚拟机类似于数据库的database,起到数据分割和管理的作用,一个BrokerServer内部可以有多个虚拟机.

交换机Exchange:生产者将消息发送到Broker的交换机上,交换机根据不同的路由规则,将消息分发到不同的队列上。

队列Queue:接收并存储交换机分发的消息,根据消费者的订阅队列的情况,将消息推送给消息者。(消费者自己决定从哪个队列上读取消息)

绑定关系Binding:这个绑定关系,指的是交换机和队列的绑定关系。(交换机和队列存在多对多的绑定关系)

消息Message:真正存储的数据。

BrokerServer的核心API:

对于BrokerServer来说,要实现以下核心API,通过这些API来实现消息队列的基本功能:

1. 创建队列 (queueDeclare)

2. 销毁队列 (queueDelete)

3. 创建交换机 (exchangeDeclare)

4. 销毁交换机 (exchangeDelete)

5. 创建绑定 (queueBind)

6. 解除绑定 (queueUnbind)

7. 发布消息 (basicPublish)

8. 订阅消息 (basicConsume)

9. 确认消息 (basicAck)

生产者和消费者通过远程调用这些API,实现生产者 消费者模型。

交换机Exchange:

对于RabbitMQ来说,支持4种交换机类型:

直接交换机Direct,扇出交换机 Fanout ,主题交换机Topic ,首部交换机Header

Header交换机比较复杂且实用场景较少,目前我们仅实现前3种。

直接交换机Direct:⽣产者发送消息时,直接指定被该交换机绑定的队列名。(发放专属红包)

扇出交换机 Fanout :⽣产者发送的消息会被复制到该交换机的所有队列中. (发一个红包,所有人都能领,且领到的是该红包的所有金额)

主题交换机Topic:绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列。(发一个口令红包,回答出对应的口令,才可以领红包,且领到的是该红包的所有金额)

持久化:

要将交换机,队列,消息都设置持久化功能,防止当BrokerServer出现异常宕机时,重启后,存储在上面的数据丢失。(将其都存储在磁盘和内存上各一份)

网络通信:

生产者和消费者都是客户端,BrokerServer作为服务端,通过网络进行通信,客户端要提供对应的API,来实现对服务器的调用。

1. 创建 Connection

2. 关闭 Connection

3. 创建 Channel

4. 关闭 Channel

5. 创建队列 (queueDeclare)

6. 销毁队列 (queueDelete)

7. 创建交换机 (exchangeDeclare)

8. 销毁交换机 (exchangeDelete)

9. 创建绑定 (queueBind)

10. 解除绑定 (queueUnbind)

11. 发布消息 (basicPublish)

12. 订阅消息 (basicConsume)

13. 确认消息 (basicAck)

在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.

Connection 对应⼀个 TCP 连接.

Channel 则是 Connection 中的逻辑通道.⼀个 Connection 中可以包含多个 Channel.

Channel 和 Channel 之间的数据是独⽴. 不会相互⼲扰。这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接。

消息应答:

被消费的消息,要进行消息应答,当消费者的应答方式为自动时,只要消息从队列中分发出去,就会将其从队列中删除;若为手动应答模式,需要消费者自己调用确认接收消息并返回确认响应,才会将消息从队列中删除。

三、模块划分

四、创建项目

创建springboot项目,引入springWeb和MyBatis,LomBok类。

五、创建核心类

Exchange,MSGQueue,Binding,Message:

Exchange:

/*** 创建交换机:*/
@Data
public class Exchange {//交换机的唯一标识private String name;//交换机类型:默认为直接交换件private ExchangeType type = ExchangeType.DIRECT;//是否持久化 ,默认持久化private boolean durable = true;//在交换机不再使用的时候,是否自动删除 ,默认不自动删除private boolean autoDelete = false;//选填参数private Map<String,Object> args;
}

关于args属性, 由于数据库没有对应的Map类型,在写入数据库时,要将其转换成String类型,从数据库中读取数据时,再将其转换成Map类型,因此,要重写args的set和get方法:

    //这里的args的数据类型是Map,数据库中没有对应的数据类型,// 存入数据库时,要转换成String类型,//从数据库中读取时,要转换成Map类型public String getArgs(){ObjectMapper mapper = new ObjectMapper();try {return mapper.writeValueAsString(args);} catch (JsonProcessingException e) {
//            throw new RuntimeException(e);e.printStackTrace();}return "{}";}public void setArgs(String args) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();//              将字符串转换为HashMap类型,若转换为基本数据类型时直接填就行,//              这里转换的是HashMap集合类型,比较复杂,需要借助匿名内部类TypeReference类实现this.args = mapper.readValue(args, new TypeReference<HashMap<String,Object>>() {});}

/*** 使用枚举类 声明交换机类型*/
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.type = type;}public int getType(){return this.type;}
}

MSGQueue:


/*** 创建队列类*/
@Data
public class MSGQueue {//队列唯一标识private String name;//是否持久化 ,默认持久化private boolean durable = true;//是否自动删除 ,默认不自动删除private boolean autoDelete = false;//选填参数private Map<String,Object> args;
}

这里的args有和Excahnge同样的问题,要重写args的set和get方法:

    //这里的args的数据类型是Map,数据库中没有对应的数据类型,// 存入数据库时,要转换成String类型,//从数据库中读取时,要转换成Map类型public String getArgs(){ObjectMapper mapper = new ObjectMapper();try {return mapper.writeValueAsString(args);} catch (JsonProcessingException e) {
//            throw new RuntimeException(e);e.printStackTrace();}return "{}";}public void setArgs(String args) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();this.args = mapper.readValue(args, new TypeReference<HashMap<String,Object>>() {});}

Binding:

/*** 创建绑定关系 交换机->队列*/
@Data
public class Binding {//要绑定的交换机名private String exchangeName;//要绑定的队列名private String queueName;//绑定关系(只有交换机为Topic类型时,该绑定才起作用)private String bindingKey;
}

Message:

/*** 创建消息* 后面传递消息时,需要将消息进行序列化,此处要实现Serializable接口才可以被序列化*/
@Data
public class Message implements Serializable {//消息中的真正数据private byte[] body;//消息本身的一些属性:private BasicProperties basicProperties = new BasicProperties();//消息在文件中的起始位置,[offsetBeg,offsetEnd]//由于消息要持久化存储,因此要存储在磁盘中,为了区分每一条消息,记录每条消息的起始位置//使用transient修改,避免被序列化private transient long offsetBeg = 0;private transient long offsetEnd = 0;//消息在磁盘中是否有效,该属性用于删除磁盘消息使用,使用逻辑删除//0x1:有效 0x0:无效private byte isVaild = 0x1;//创建消息 同时给消息分配一个id,通过UUID,创建唯一ID,消息Id以"M-"开头//routingKey:以参数为准,当原来的消息存在routingKey时,参数会将其覆盖//此处相当于使用了工厂方法创建消息对象,而没有使用构造方法,因为这样可以通过方法名获取到相关信息public Message createMessageById(String routingKey,BasicProperties basicProperties,byte[] body){Message message = new Message();message.setMessageId("M-"+ UUID.randomUUID());message.setRoutingKey(routingKey);if(basicProperties!=null){message.setBasicProperties(basicProperties);}message.setBody(body);return message;}//为了方便引用,将BasicProperties中的属性在这里调用一下,方便直接通过Message调用其基本属性public String getMessageId(){return basicProperties.getMessageId();}public void setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliveryMode(){return basicProperties.getDeliveryMode();}public void setDeliveryMode(int deliveryMode){basicProperties.setDeliveryMode(deliveryMode);}
}

/*** 保存消息的属性信息* 为保证消息可以被序列化,此处也要实现Serializable接口*/
@Data
public class BasicProperties implements Serializable {//消息的唯一标识private String messageId;//消息的RoutingKey,用于消息转发private String routingKey;//是否持久化//1:持久化 0:非持久化private int deliveryMode = 1;
}

六.数据库设计

要将数据持久化,就要存储到磁盘上,常用的数据库有MySQL,....

此处我们使用SQList来存储数据,SQList较于MySQL更加轻量级,使用起来也更简单.

SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件

引入pom.xml依赖:

<!--        引入 SQLite数据库--><!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.49.1.0</version>
</dependency>

配置数据源applicaton.yml:

spring:datasource:url: jdbc:sqlite:./data/meta.db #约定将数据库文件放到 ./data/meta.db中username:  #由于sqlite不是针对客户端服务的,因此无需设置用户名和密码password:driver-class-name: org.sqlite.JDBC
mybatis:mapper-locations: classpath:mapper/**Mapper.xml

创建表,删除表,查询数据:


/*** 创建表*/
@Mapper
public interface MetaMapper {//创建交换机 队列 绑定关系表void createExchangeTable();void createQueueTable();void createBindingTable();//新增数据void insertExchange(Exchange exchange);void insertQueue(MSGQueue queue);void insertBinding(Binding binding);//删除数据void deleteExchange(String exchangeName);void deleteQueue(String queueName);void deleteBinding(String exchangeName,String queueName);//查询数据List<Exchange> selectAllExchanges();List<MSGQueue> selectAllQueues();List<Binding> selectAllBindings();
}

由于MyBatis仅针对MySQL / Oracle ⽀持执⾏多个 SQL 语句的, 但是针对 SQLite 是不⽀持的, 只能写成多个⽅法.

MetaMapper.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.rabbitmq.mq02.mqServer.mapper.MetaMapper">
<!--    创建表--><update id="createExchangeTable">create table if not exists exchange(name varchar(64) promary key,type int,durable boolean,autoDelete boolean,args varchar(1024))</update><update id="createQueueTable">create table if not exists queue(name varchar(64) primary key,durable boolean,autoDelete boolean,args varchar(1024))</update><update id="createBindingTable">create table if not exists binding(exchangeName varchar(64),queueName varchar(64),bindingKey varchar(64))</update>
<!--    插入数据--><insert id="insertExchange" parameterType="org.rabbitmq.mq02.mqServer.core.Exchange">insert into exchange values(#{name},#{type},#{durable},#{autoDelete},#{args});</insert><insert id="insertQueue" parameterType="org.rabbitmq.mq02.mqServer.core.MSGQueue">insert into  queue values(#{name},#{durable},#{autoDelete},#{args})</insert><insert id="insertBinding" parameterType="org.rabbitmq.mq02.mqServer.core.Binding">insert into binding values(#{exchangeName},#{queueName},#{bindingKey})</insert>
<!--    删除数据--><delete id="deleteExchange" parameterType="java.lang.String">delete from exchange where name = #{name};</delete><delete id="deleteQueue" parameterType="java.lang.String">delete from queue where name = #{name};</delete><delete id="deleteBinding" parameterType="java.lang.String">delete from binding where bindingKey = #{bindingKey};</delete>
<!--    查找数据--><select id="selectAllExchanges" resultType="org.rabbitmq.mq02.mqServer.core.Exchange">select * from exchange;</select><select id="selectAllQueues" resultType="org.rabbitmq.mq02.mqServer.core.MSGQueue">select * from queue;</select><select id="selectAllBindings" resultType="org.rabbitmq.mq02.mqServer.core.Binding">select * from binding;</select>
</mapper>

七.实现DataBaseManager类:

通过这个类来封装针对数据库的操作.整理数据库中的数据

MeatMapper要通过手动管理,而非spring注入,修改MqApplication类,新增一个属性:

@SpringBootApplication
public class Mq02Application {public static ConfigurableApplicationContext context;public static void main(String[] args) {context = SpringApplication.run(Mq02Application.class, args);}
}

DataBaseManager类:

/*** 通过这个类来封装针对数据库的操作.*/
public class DataBaseManager {
//通过手动管理,非spring注入private MetaMapper meatMapper;public void init(){//手动管理数据meatMapper = Mq02Application.context.getBean(MetaMapper.class);//建库建表//1.先查询数据库是否已经存在if(!checkDBExists()){//不存在时,先建库建表File file = new File("./data/meta.db");//先创建目录file.mkdirs();//创建表createTable();//初始化,主要初始化 交换机的类型createDefaultExchange();System.out.println("[DataBaseManager] 数据库初始化完成!");}else{System.out.println("[DataBaseManager] 数据库以存在!");}}//初始化交换机private void createDefaultExchange() {Exchange exchange = new Exchange();exchange.setType(ExchangeType.DIRECT);exchange.setName("");exchange.setAutoDelete(false);exchange.setDurable(false);meatMapper.insertExchange(exchange);System.out.println("[DataBaseManager] 初始化交换机成功!");}private void createTable() {meatMapper.createExchangeTable();meatMapper.createQueueTable();meatMapper.createBindingTable();System.out.println("[DataBaseManager] 建表成功!");}//查询数据库是否已经创建private boolean checkDBExists() {File file = new File("./data/meta.db");return file.exists();}//封装其他数据库操作:对交换机 队列 绑定关系的 增删查public void insertExchange(Exchange exchange){meatMapper.insertExchange(exchange);
}public List<Exchange> selectAllExchanges(){return meatMapper.selectAllExchanges();}public void deleteExchange(String exchangeName){meatMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue){meatMapper.insertQueue(queue);}public List<MSGQueue> selectAllQueues(){return meatMapper.selectAllQueues();}public void deleteQueue(String queueName){meatMapper.deleteQueue(queueName);}public void insertBinding(Binding binding){meatMapper.insertBinding(binding);}public List<Binding> selectAllBindings(){return meatMapper.selectAllBindings();}public void deleteBindings(String bindingKey){meatMapper.deleteBinding(bindingKey);}
}

编写DataBaseManager类的测试用例:

在实现项目的过程中,每实现一个阶段的功能,不可能一下就全部正确实现,可能会出现各种错误.

就要及时编写测试用例,对当前代码功能进行测试,防止写完了整个项目再测试,出现各种错误.

做到有错即时查找修改.

    private DataBaseManager dataBaseManager = new DataBaseManager();/*** 有些测试用例产生的数据会影响之后的测试结果,在每次测试之前,还要进行数据清除,比较麻烦* 这个方法,在执行每一个测试前,要执行该方法,用于初始化数据,构造好数据*/@BeforeEachvoid setUp() {Mq02Application.context = SpringApplication.run(MetaMapper.class);//在执行每个测试之前,进行数据库的创建和舒适化工作:dataBaseManager.init();}/*** 在执行每个测试用例之后,都要执行该方法,用于数据的清除和资源的释放,*/@AfterEachvoid tearDown() {//释放资源,删除数据库Mq02Application.context.close();dataBaseManager.deleteTable();}

再在DataBaseManager类中添加删除数据库及目录方法:

    //删除数据库及创建的目录:public void deleteTable() {File file = new File("./data/meta.db");//1.删除数据库:boolean ok = file.delete();if(ok){System.out.println("[DataBaseManager] 删除数据库成功!");}else{System.out.println("[DataBaseManager] 删除数据库失败!");}//2.删除目录:
//注意,在windows系统上,只有目录为空时,才能删除成功File file1 = new File("./data");ok = file1.delete();if(ok){System.out.println("[DataBaseManager] 删除目录成功!");}else{System.out.println("[DataBaseManager] 删除目录失败!");}}

测试init()方法:

    @Testvoid init(){//初始化方法已经在setUp方法中调用过了,这里只要判断数据是否正确:List<Exchange> exchanges = dataBaseManager.selectAllExchanges();List<MSGQueue> msgQueues = dataBaseManager.selectAllQueues();List<Binding> bindings = dataBaseManager.selectAllBindings();//判断创建结果是否正确,可以通过打印结果,自己观察比对,//还可以调用专门的测试方法,直接与期望值进行对比,判断是否正确//assertEquals方法的第一个参数默认为期望值,第二个参数为实际值//判断期望值与实际值是否一致,一致返回true,不一致返回false//交换机在初始化的时候已经初始化了一个,因此,查询到的交换机的个数应为1个,队列和绑定为0个Assertions.assertEquals(1,exchanges.size());Assertions.assertEquals(0,msgQueues.size());Assertions.assertEquals(0,bindings.size());}

对交换机,队列,绑定的插入删除测试:

//对交换机的测试   //先创建一个交换机实例:private Exchange createExchange(String exchangeName){Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);
//        exchange.setArgs(null);return exchange;}@Testvoid insertExchange() {Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());Exchange e = exchanges.get(1);Assertions.assertEquals("exchangeTest",e.getName());Assertions.assertEquals(ExchangeType.DIRECT,e.getType());Assertions.assertTrue(e.isDurable());Assertions.assertFalse(e.isAutoDelete());}@Testvoid deleteExchange() {Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());dataBaseManager.deleteExchange("exchangeTest");exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(1,exchanges.size());}
//对队列的测试private MSGQueue createQueue(String queueName){MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);return queue;}@Testvoid insertQueue() {MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());Assertions.assertEquals("queueTest",queues.get(0).getName());Assertions.assertTrue(queues.get(0).isDurable());Assertions.assertFalse(queues.get(0).isAutoDelete());}@Testvoid deleteQueue() {MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());dataBaseManager.deleteQueue("queueTest");queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(0,queues.size());}//对绑定的测试:private Binding createBinding(String bindingKey){Binding binding = new Binding();binding.setExchangeName("exchangeTest");binding.setQueueName("queueTest");binding.setBindingKey(bindingKey);return binding;}@Testvoid insertBinding() {//创建绑定之前,要先创建交换机和队列Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());Binding binding = createBinding("bindingKeyTest");dataBaseManager.insertBinding(binding);List<Binding> bindings = dataBaseManager.selectAllBindings();Assertions.assertEquals(1,bindings.size());Binding b = bindings.get(0);Assertions.assertEquals("bindingKeyTest",b.getBindingKey());Assertions.assertEquals("exchangeTest",b.getExchangeName());Assertions.assertEquals("queueTest",b.getQueueName());}@Testvoid deleteBindings() {//创建绑定之前,要先创建交换机和队列Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());Binding binding = createBinding("bindingKeyTest");dataBaseManager.insertBinding(binding);List<Binding> bindings = dataBaseManager.selectAllBindings();Assertions.assertEquals(1,bindings.size());dataBaseManager.deleteBindings("bindingKeyTest");bindings = dataBaseManager.selectAllBindings();Assertions.assertEquals(0,bindings.size());}

八.消息存储设置

消息要存储在磁盘中,从文件中读写,消息较于交换机队列来说,查询次数较少,仅需要读和取消息.

因此从文件中读取比从数据库中查找要快很多.

根据消息所在不同的队列,为每个队列命名一个目录,为 data/队列名.eg: ./data/testQueue.

每个队列所目录中包含两个文件: 消息数据文件,消息统计文件. 

数据文件queue_data.txt : 存储消息体相关数据.

规定文件格式:使用二进制存储(节省空间).每个消息分成2部分,第一部分存储该消息对象的长度,规定占4字节; 第二部分存储消息的数据部分.消息和消息之间首尾相连.

统计文件queue_stat.txt: 存储该数据文件中的总消息数和有效消息数,

规定使用文本格式存储, 仅存储一行两列数据,第一列为数据文件中的总消息数,第二行为有效消息数中间用\t进行分隔.eg: 100\t50.

九.创建MessageFileManager类

创建MessageFileManager类:

/*** 用于管理消息在文件中的存储* 该类中药完成的事情:1.创建数据文件,统计文件的目录,文件*               2.读写统计文件*               3.读写数据文件*               4.读取文件中所有的有效消息*               5.对文件中的消息进行垃圾回收*/
public class MessageFileManager {/*** 创建一个内部类,表示消息的统计文件类*/public static class Stat {//消息总数public int totalCount;//有效消息数public int validCount;}//这里目前没有要初始化的public void init(){}
}

实现该类的功能:

1.创建数据文件:统计文件的目录,文件

//获取队列文件目录名:public String getQueueDir(String queueName){return "./data/"+queueName;}//获取消息 数据文件 路径名public String getDataFilePath(String queueName){return getQueueDir(queueName)+"/queue_data.txt";}//获取消息 统计文件 路径名public String getStatFilePath(String queueName){return getQueueDir(queueName)+"/queue_stat.txt";}//创建队列文件public void createQueueFile(String queueName) throws MqException, IOException {File file = new File(getQueueDir(queueName));//当目录不存在时,创建目录if (!file.exists()) {boolean ok = file.mkdirs();if (!ok) {throw new MqException("[MessageFileManager] 创建目录失败!");}}//创建文件File dataFile = new File(getDataFilePath(queueName));if (!dataFile.exists()) {//数据文件不存在时,创建数据文件:boolean ok = dataFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 创建数据文件失败!");}}File statFile = new File(getStatFilePath(queueName));if (!statFile.exists()) {boolean ok = statFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 创建统计文件失败!");}}//初始化统计文件数据Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);System.out.println("[MessageFileManager] 创建目录及文件成功!");}//销毁文件和目录//删除指定队列的目录和文件//当队列被删除后,对应的文件和目录也就要被删除了public void destoryQueueFile(String queueName) throws IOException {//1.删除数据文件File dataFile = new File(getDataFilePath(queueName));boolean ok1 = dataFile.delete();//2.删除统计文件File statFile = new File(getStatFilePath(queueName));boolean ok2 = statFile.delete();//3.删除总目录File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if(!ok1 || !ok2 || !ok3){throw new IOException("删除队列和文件失败! baseDir: "+baseDir.getAbsolutePath());}}

自定义一个MqException异常,

用于处理抛出消息队列过程中的逻辑异常.

/*** 与项目有关的异常*/
public class MqException extends Exception {public MqException(String reason){super(reason);}
}

2.实现读写统计文件功能:

//从统计文件中读取消息private Stat readStat(String queueName){Stat stat = new Stat();try (FileInputStream fileInputStream = new FileInputStream(getStatFilePath(queueName))){//通过Scanner来读取文件中的数据Scanner scan = new Scanner(fileInputStream);stat.totalCount = scan.nextInt();stat.validCount = scan.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}//写消息到统计文件中private void writeStat(String queueName,Stat stat){//注意:此处打开文件,默认是清空文件的,可以通过设置第二个参数为true:拼接文件形式打开文件,不会清空数据try(FileOutputStream outputStream = new FileOutputStream(getStatFilePath(queueName))){PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount+"\t"+stat.validCount);//注意:要刷新缓冲区,将写入的数据从缓冲区都放到磁盘中printWriter.flush();} catch (IOException e) {throw new RuntimeException(e);}}

3.实现读写数据文件功能:

将消息存储到文件中是以二进制形式存储的,要先将消息进行序列化,然后再进行写入.读取消息数据的时候,也要将二进制文件进行反序列化,才能看懂文件内容.

这里先实现一个序列化和反序列工具类:


/*** 消息序列化和饭系列化工具,将所有方法都用static修饰,直接通过类名调用方法* 序列化:将对象转化为字节数组* 反序列化:将字节数组转化为对象*/
public class BinaryTool {//序列化: 将对象转为字节数组//将Object序列化的数据逐渐写到ByteArrayOutputStream中,再转换为byte[]public static byte[] toByte(Object object) throws IOException {try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){//writeObject方法就会将对象进行序列换,转换为二进制数据写入到ObjectOutputStream中,//由于ObjectOutputStream又关联了ByteArrayOutputStream,//于是最终就写入到了ByteArrayOutputStream中objectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}//反序列化: 将字节数组转化为对象public static Object fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) {try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){object = objectInputStream.readObject();}}return object;}
}

4.实现读写消息方法:

//向数据文件中发送消息//1.判断文件是否存在//2.将消息序列化//3.写入数据文件//4.更新统计文件数据//考虑线程安全问题://当多个线程同时发送消息时,可能会发送线程安全问题,这里以队列为维度加锁,因此要传入参数队列对象public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {//1.判断文件是否存在:boolean ok = checkFileExists(queue.getName());if(!ok){throw new MqException("[MessageFileManager] 数据文件不存在,发送消息到数据文件失败 queueName: "+queue.getName());}synchronized(queue){//2.将消息先进行序列化:byte[] messageByte = BinaryTool.toByte(message);//3.写消息到文件中:File file = new File(getDataFilePath(queue.getName()));//设置 消息在文件中的初始位置属性:message.setOffsetBeg(file.length()+4);message.setOffsetEnd(file.length()+4+messageByte.length);//这里打开文件要以追加的方式打开,不能清空文件中的内容try(FileOutputStream outputStream = new FileOutputStream(file,true);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//先写前4个字节:记录消息的长度,//注意,这里规定的就是用4字节的长度,不能单使用write写入,这样还是只写入1个字节dataOutputStream.writeInt(messageByte.length);dataOutputStream.write(messageByte);}//更新统计文件数据Stat stat = readStat(queue.getName());stat.totalCount+=1;stat.validCount+=1;writeStat(queue.getName(),stat);}}private boolean checkFileExists(String queueName) throws MqException {File baseFile = new File(getQueueDir(queueName));if(!baseFile.exists()){System.out.println("[MessageFileManager] 队列对应的目录不存在 queueName:"+queueName);return false;}File dataFile = new File(getDataFilePath(queueName));if(!dataFile.exists()){System.out.println("[MessageFileManager] 队列对应的数据文件不存在 queueName:"+queueName);return false;}File statFile = new File(getStatFilePath(queueName));if(!statFile.exists()){System.out.println("[MessageFileManager] 队列对应的统计文件不存在 queueName:"+queueName);return false;}return true;}//将一条消息从文件中删除(逻辑删除,将isValid置为无效:0x0//1.先读取到消息//2.将消息的isValid设为无效//3.再重新写入到文件原来位置//4.修改配置文件数据//删除文件也存在线程安全问题,要以队列为维度进行上锁public void deleteMessageFromFile(MSGQueue queue,Message message){synchronized(queue){//要找消息的位置,采用随机读取文件方法,且以可读可写的方式打开文件try(RandomAccessFile randomAccessFile = new RandomAccessFile(getDataFilePath(queue.getName()),"rw")){int len = (int)(message.getOffsetEnd() - message.getOffsetBeg());byte[] data = new byte[len];//定位光标到消息的起始位置:randomAccessFile.seek(message.getOffsetBeg());//从消息的起始位置读取文件randomAccessFile.read(data);//将数据反序列化:Message newMessage = (Message) BinaryTool.fromBytes(data);//修改为无效消息newMessage.setIsVaild((byte)0x0);//再将消息写入到文件中://先进行序列化消息byte[] payload = BinaryTool.toByte(newMessage);//要写到原来位置,在读取的时候,光标会变化,要重新设置光标,randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(payload);//更新统计文件数据Stat stat = readStat(queue.getName());if(stat.validCount>0){stat.validCount-=1;}writeStat(queue.getName(),stat);} catch (IOException | ClassNotFoundException e) {throw new RuntimeException(e);}}}

5.加载文件中的所有有效消息;

用于服务器重启时恢复数据使用:

//获取文件中的所有有效数据,该方法用户服务器宕机重启时,恢复数据public List<Message> loadAllMessage(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try(FileInputStream fileInputStream = new FileInputStream(getDataFilePath(queueName));DataInputStream dataInputStream = new DataInputStream(fileInputStream)){long currentOffset = 0;while(true){int len = dataInputStream.readInt();//读取数据byte[] data = new byte[len];int n = dataInputStream.read(data);if(n!=len){throw new MqException("[MessageFileManager] 读取消息格式有误! queueName:"+queueName);}//进行反序列化Message message = (Message) BinaryTool.fromBytes(data);if(message.getIsVaild()==0x0){//为无效消息,直接跳过currentOffset+=(4+len);continue;}//设置消息的起始位置message.setOffsetBeg(currentOffset+4);message.setOffsetEnd(currentOffset+4+len);currentOffset+=(4+len);messages.add(message);}} catch (EOFException e){//这个异常是读取到文件末尾,抛出的异常,属于正常逻辑System.out.println("[MessageFileManager] 加载消息完成 !");}return messages;}

6.实现垃圾清除功能:GC:

//进行垃圾回收:当统计文件中记录的有效数据数量占总数据数量<50%时,进行垃圾回收//垃圾回收:新创建一个文件,将有效消息复制到新文件中,复制完后,将源文件删除,将新文件名改为源文件名public void GC(MSGQueue queue) throws IOException, MqException, ClassNotFoundException {//创建一个新的文件:File newFile = new File(getNewDataFilePath(queue.getName()));if(newFile.exists()){throw new MqException("[MessageFileManager] 上次GC未执行成功, queueName:"+queue.getName());}boolean ok = newFile.createNewFile();if(!ok){throw new MqException("[MessageFileManager] 创建新文件失败! queueName:"+queue.getName());}//将原来文件中的消息都取出来List<Message> messages = loadAllMessage(queue.getName());//将消息进行序列化,再存入新文件中try(FileOutputStream outputStream = new FileOutputStream(newFile,true);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for(Message m:messages){byte[] payload = BinaryTool.toByte(m);dataOutputStream.writeInt(payload.length);dataOutputStream.write(payload);}}//删除旧文件String oldDataFilePath = getDataFilePath(queue.getName());File oldFile = new File(oldDataFilePath);ok = oldFile.delete();if(!ok){throw new MqException("[MessageFileManager] 旧文件删除失败! oldFilePath:"+oldFile.getAbsolutePath());}//更新新文件名:ok = newFile.renameTo(oldFile);if(!ok){throw new MqException("[MessageFileManager] 更新文件名失败 newFilePath"+newFile.getAbsolutePath());}//更新统计文件数据Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);}
//创建一个新的文件路径:private String getNewDataFilePath(String queueName) {return getQueueDir(queueName)+"newQueue_data.txt";}
//查看当前队列是否需要进行GC扫描public boolean checkGC(String queueName){Stat stat = readStat(queueName);int t1 = stat.validCount;int t2 = stat.totalCount;if(t2>=2000 && t1*1.0 /t2*1.0<0.5){return true;}return false;}

编写MessageFileManager测试用例:

@SpringBootTest
class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();private static String queueName1 = "queueTest1";private static String queueName2 = "queueTest2";//创建2个文件@BeforeEachvoid setUp() throws IOException, MqException {messageFileManager.createQueueFile(queueName1);messageFileManager.createQueueFile(queueName2);System.out.println("2个文件创建成功!");}
//销毁文件@AfterEachvoid tearDown() throws IOException {messageFileManager.destoryQueueFile(queueName1);messageFileManager.destoryQueueFile(queueName2);System.out.println("2个文件销毁成功!");}@Testvoid createQueueFile() {//在setUp方法中已经调用了该方法了,这里仅进行结果测试对比了File dataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");File dataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");File statFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");File statFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertTrue(dataFile1.exists());Assertions.assertTrue(dataFile2.exists());Assertions.assertTrue(statFile1.exists());Assertions.assertTrue(statFile2.exists());}@Testvoid readWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;//由于对统计文件的读写访问方法都是私有方法,无法直接进行调用//但又必须进行测试,可以通过反射的方法进行访问//注意:这里不能通过先修改督学stat方法的访问权限,进行测试,然后再将访问权限改过来,//因为对原代码测试后再修改后的测试就是无效的,不知道修改后的代码是否还会出现别的问题//                 参数说明:      要反射的方法的类对象    要反射的方法名      方法的参数ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);MessageFileManager.Stat s = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(100,s.totalCount);Assertions.assertEquals(50,s.validCount);}private MSGQueue createQueue(){MSGQueue queue = new MSGQueue();queue.setName(queueName1);queue.setDurable(true);queue.setAutoDelete(false);return queue;}private Message createMessage(String body){Message message = new Message();return message.createMessageById("routingKeyTest",null,body.getBytes());}@Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();Message message = createMessage("hello");messageFileManager.sendMessage(queue,message);//测试stat文件MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1,stat.validCount);Assertions.assertEquals(1,stat.totalCount);//测试data文件List<Message> messages = messageFileManager.loadAllMessage(queueName1);Assertions.assertEquals(1,messages.size());Message m = messages.get(0);Assertions.assertEquals(message.getMessageId(),m.getMessageId());Assertions.assertEquals(message.getRoutingKey(),m.getRoutingKey());Assertions.assertEquals(message.getDeliveryMode(),m.getDeliveryMode());Assertions.assertArrayEquals(message.getBody(),m.getBody());System.out.println(m);}@Testvoid deleteMessageFromFile() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();List<Message> list = new LinkedList<>();//向队列中发送10条消息for(int i=0;i<5;i++){Message message = createMessage("hello" + i);messageFileManager.sendMessage(queue,message);list.add(message);}Assertions.assertEquals(5,list.size());//删除后3条messageFileManager.deleteMessageFromFile(queue,list.get(3));messageFileManager.deleteMessageFromFile(queue,list.get(4));messageFileManager.deleteMessageFromFile(queue,list.get(2));List<Message> messages = messageFileManager.loadAllMessage(queueName1);Assertions.assertEquals(2,messages.size());for(int i=0;i<2;i++){Message m = messages.get(i);Message m2 = list.get(i);Assertions.assertEquals(m2.getRoutingKey(),m.getRoutingKey());Assertions.assertEquals(m2.getDeliveryMode(),m.getDeliveryMode());Assertions.assertEquals(m2.getMessageId(),m.getMessageId());Assertions.assertArrayEquals(m2.getBody(),m.getBody());Assertions.assertEquals(m2.getIsVaild(),m.getIsVaild());}}@Testvoid loadAllMessage() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();List<Message> list = new LinkedList<>();//向队列中添加100条消息for(int i=0;i<100;i++){Message message = createMessage("hello"+i);messageFileManager.sendMessage(queue,message);list.add(message);}List<Message> messages = messageFileManager.loadAllMessage(queue.getName());Assertions.assertEquals(100,messages.size());for(int i=0;i<100;i++){Message m1 = messages.get(i);Message m2 = list.get(i);Assertions.assertEquals(m2.getRoutingKey(),m1.getRoutingKey());Assertions.assertEquals(m2.getDeliveryMode(),m1.getDeliveryMode());Assertions.assertEquals(m2.getMessageId(),m1.getMessageId());Assertions.assertArrayEquals(m2.getBody(),m1.getBody());Assertions.assertEquals(m2.getIsVaild(),m1.getIsVaild());}}@Testvoid GC() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();List<Message> list = new LinkedList<>();//向队列中发送10条消息for(int i=0;i<100;i++){Message message = createMessage("hello" + i);messageFileManager.sendMessage(queue,message);list.add(message);}File file = new File("./data/" + queue.getName() + "/queue_data.txt");System.out.println("GC前文件大小: "+file.length());Assertions.assertEquals(100,list.size());//删除偶数消息for(int i=0;i<100;i+=2){Message message = list.get(i);messageFileManager.deleteMessageFromFile(queue,message);}messageFileManager.GC(queue);System.out.println("GC后文件大小: "+file.length());List<Message> messages = messageFileManager.loadAllMessage(queueName1);for(int i=0;i<messages.size();i++){Message m1 = messages.get(i);Message m2 = list.get(2*i+1);Assertions.assertEquals(m2.getRoutingKey(),m1.getRoutingKey());Assertions.assertEquals(m2.getDeliveryMode(),m1.getDeliveryMode());Assertions.assertEquals(m2.getMessageId(),m1.getMessageId());Assertions.assertArrayEquals(m2.getBody(),m1.getBody());Assertions.assertEquals(m2.getIsVaild(),m1.getIsVaild());}}
}

十.整合数据库和文件数据

上面的代码中,使用数据库存储了Exchange,Queue,Binding,使用文件存储了Message,

下面对数据库和文件中的数据进行整合.进行统一管理.

见下一篇博客.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/95327.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/95327.shtml
英文地址,请注明出处:http://en.pswp.cn/web/95327.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何构建StarRocks官方文档

不知道是网络问题还是官网问题&#xff0c;StarRocks文档经常出现卡顿的情况&#xff0c;曾经构建过Flink文档&#xff0c; 所以也想尝试自己构建一个StarRocks的本地官方文档 断断续续折腾了好几天&#xff0c;就不废话了&#xff0c;直接上实际步骤 1. 环境 1.1 Linux环境 …

堡垒机(跳板机)入门指南:构建更安全的多服务器运维架构

随着你的业务不断扩张&#xff0c;你云上服务器的数量&#xff0c;是不是也从一台&#xff0c;变成了三台、五台、甚至一个由几十台机器组成的庞大集群&#xff1f;你像一个尽职的“国王”&#xff0c;为你王国的每一座“城池”&#xff08;每一台服务器&#xff09;&#xff0…

(链表)Leetcode206链表反转+Leetcode6删除链表的倒数第N个结点+虚拟头节点使用

虚拟头结点的作用是&#xff1a;简化插入/删除逻辑方便返回头节点减少边界错误 Leetcode206链表反转 206. 反转链表 - 力扣&#xff08;LeetCode&#xff09; 头插法 # Definition for singly-linked list. # class ListNode(object): # def __init__(self, val0, nextN…

自然语言处理NLP:嵌入层Embedding中input_dim的计算——Tokenizer文本分词和编码

1. 词汇表大小&#xff08;input_dim&#xff09;计算方法 嵌入层Embedding中的input_dim是根据数据中所有唯一词&#xff08;或字&#xff09;的总数来决定的。可以通过Tokenizer文本分词和编码得到。 简单说&#xff0c;Tokenizer 是一个文本分词和编码器&#xff0c;它主要做…

python中的分代垃圾回收机制的原理【python进阶二、2】

1. 分代设计思想Python 将对象按存活时间分为三代&#xff08;Generation 0, 1, 2&#xff09;&#xff1a;0代&#xff08;年轻代&#xff09;&#xff1a;新创建的对象。1代&#xff08;中年代&#xff09;&#xff1a;经历一次GC扫描后存活的对象。2代&#xff08;老年代&am…

【后端】云服务器用nginx配置域名访问前后端分离项目

云服务器有多个服务&#xff08;前端 3000 端口、后端 8288 端口&#xff0c;甚至还有别的服务&#xff09;。希望用户只输入 域名&#xff08;比如 https://example.com&#xff09;&#xff0c;而不是 example.com:3000、example.com:8288。本质上是要做 端口隐藏 域名统一入…

软考中级数据库系统工程师学习专篇(67、数据库恢复)

67、数据库恢复数据库故障恢复中基于检查点的事务分类与处理策略在数据库系统发生故障后的恢复过程中&#xff0c;​检查点&#xff08;Checkpoint&#xff09;​​ 技术是关键机制&#xff0c;它能有效缩小恢复范围&#xff0c;减少需要扫描的日志量&#xff0c;从而加速恢复进…

SpringBoot 分库分表 - 实现、配置与优化

分库分表&#xff08;Database Sharding&#xff09;是一种数据库架构优化技术&#xff0c;通过将数据分散到多个数据库或表中&#xff0c;以应对高并发、大数据量场景&#xff0c;提升系统性能和扩展性。 在 Spring Boot 中&#xff0c;分库分表可以通过框架支持&#xff08;如…

爬虫代理实操:选择可靠的HTTP(S)代理的方法

在爬虫工作里&#xff0c;选对代理协议&#xff08;HTTP/HTTPS&#xff09;只是第一步&#xff0c;更关键的是找到 “可靠” 的代理 —— 哪怕是 HTTPS 代理&#xff0c;若节点不稳定、IP 纯净度低&#xff0c;照样会频繁被封&#xff0c;反而耽误采集进度。这几年踩过不少坑&a…

数据库常见故障类型

数据库常见故障类型数据库系统运行过程中可能发生的故障主要分为以下三类&#xff0c;其破坏性由小到大&#xff1a;故障类型别名根本原因影响范围典型例子​1. 事务故障​逻辑故障事务内部的程序逻辑错误或输入异常。​单个或少量事务。- 输入数据不合法&#xff08;如除零错误…

【Android】Span富文本简介

一&#xff0c;概述android.text包下span体系类&#xff0c;主要指Spanned、Spannable、ParagraphStyle、CharacterStyle实现类。Android通过Span体系&#xff0c;搭建了富文本API&#xff0c;其中Spanned、Spannable实现了CharSequence接口&#xff0c;旨在映射段落start~end之…

【HTML】draggable 属性:解锁网页交互新维度

一、简介 在Web开发中&#xff0c;用户与内容的交互方式直接影响用户体验的深度。在 HTML 中&#xff0c;draggable 是一个全局属性&#xff0c;通过简单配置即可让任意元素实现拖拽功能。也可通过结合 draggable 属性和 JavaScript 事件&#xff0c;可以实现丰富的拖放交互功能…

如何在Github中创建仓库?如何将本地项目上传到GitHub中?

1.1 点击New repository&#xff08;这个是创建代码仓库的意思&#xff09;初次完成后只有一个文件最后&#xff1a;在本地git clone 项目地址然后把项目文件复制到git的文件夹内再提交到远程仓库git add . git commit -m "修改https"git push origin mainmain为分支…

【前端教程】HTML 基础界面开发

一、网站导航栏设计与实现 导航栏是网站的重要组成部分&#xff0c;负责引导用户浏览网站的各个板块。以下是一个实用的导航栏实现方案&#xff1a; 实现代码 HTML 结构&#xff1a; <!DOCTYPE html> <html> <head><meta charset"utf-8" /&…

【学Python自动化】 6. Python 模块系统学习笔记

一、模块基础 什么是模块&#xff1f;包含 Python 定义和语句的 .py 文件解决代码复用和组织问题每个模块有自己的命名空间创建模块示例# fibo.py - 斐波那契模块 def fib(n):"""打印小于n的斐波那契数列"""a, b 0, 1while a < n:print(a, e…

机器学习-时序预测2

门控循环单元GRU 接着机器学习-时序预测1-CSDN博客这个说&#xff0c;GRU是LSTM的一个简化而高效的变体&#xff0c;都使用“门控机制”来控制信息流&#xff0c;但它通过合并一些组件&#xff0c;使结构更简单、参数更少、计算更快&#xff0c;同时在许多任务上性能与 LSTM 相…

数据湖与数据仓库

大数据前沿技术详解 目录 数据湖技术湖仓一体架构数据网格实时流处理技术云原生数据技术数据治理与血缘AI原生数据平台边缘计算与大数据 核心内容包括&#xff1a; 数据湖技术 - 架构模式、技术栈、面临的挑战 湖仓一体架构 - Delta Lake、Iceberg、Hudi等主流实现 数据网格…

Python OpenCV图像处理与深度学习:Python OpenCV入门-图像处理基础

Python OpenCV入门实践&#xff1a;图像处理基础 学习目标 通过本课程&#xff0c;学员们将了解OpenCV的基本概念、安装方法&#xff0c;掌握如何使用Python和OpenCV进行基本的图像处理操作&#xff0c;包括图像的读取、显示、保存以及简单的图像变换。 相关知识点 Python Open…

【lua】Lua 入门教程:从环境搭建到基础编程

Lua 入门教程&#xff1a;从环境搭建到基础编程 Lua 是一种轻量级、可扩展的脚本语言&#xff0c;广泛应用于游戏开发&#xff08;如《魔兽世界》《Roblox》&#xff09;、嵌入式系统、Web 后端等领域。它语法简洁、运行高效&#xff0c;非常适合作为编程入门语言或辅助开发工…

MySQL索引事务(未完成)

索引的相关操作1.查看索引show index from 表名;2.创建索引create index 索引名字 on 表名(列名);创建索引&#xff0c;是一个危险操作创建索引的时候&#xff0c;需要针对现有的数据&#xff0c;进行大规模的重新整理如果当前表是一个空表&#xff0c;或者数据不多&#xff0c…