目录
- 1. 环形队列的概念与实现方法
- 1.1 环形队列的概念
- 1.2 环形队列的一般实现方法
- 2. 多线程相关的信号量概念与接口
- 2.1 信号量类型
- 2.2 信号量的初始化与销毁
- 2.3 信号量的P申请、V释放操作
- 3. 基于环形队列实现p、c模型的设计方案
- 3.1 环形队列(ringqueue)作为共享资源缓冲区的设计要求
- 3.2 环形队列作为共享资源缓冲区的实现方法
- 4. 代码实现
- 4.1 线程类
- 4.2 线程执行的任务
- 4.3 RingQueue类
- 4.4 主干代码
1. 环形队列的概念与实现方法
1.1 环形队列的概念
- 环形队列:
容量固定,首尾相连的一个队列(遵循先进先出),于逻辑上形状为一个环形。当存储数据超过容量上限后,再次给环形队列插入元素时,不会产生越界的情况。新的数据会回绕到数据首部进行插入,将空间中的旧数据覆盖。
1.2 环形队列的一般实现方法
最常用于实现环形队列的数据结构为顺序表,其通过对下标的取模操作,就可以很好的实现存储空间逻辑上的环形要求,具体实现方案如下:
方案1:
-
开辟要求容量
cap
大小的顺序表 -
以变量
begin
记录队列中首个数据的下标,起始设置为0 -
以变量
end
记录队列中新数据插入位置的下标,起始设置为0
-
以变量
count
记录队列中存储数据的个数,每次插入数据后进行++
,每次删除数据后进行--
。队列为空与为满时,begin
与end
的位置相同,因此,需要以count
中记录的值来做区分 -
插入元素时,向
end
记录的下标位置写入数据,然后再++
-
删除元素时,只需让
begin
进行++
即可,begin
与end
下标之间的空间中存储着队列中的数据 -
begin
与end
每次++
后都要对其进行取模操作%= cap
方案2:
方案2的大体实现思路与方案1相同,只是于队列空满判定实现上有所区别。此种方法,会将队列额外多开辟出一块空间,队列容量为cap
,实际大小为cap + 1
。此块多出来的空间不用来存放数据,而是用来标识环形队列的满状态(end + 1) % (cap + 1) == begin
。
2. 多线程相关的信号量概念与接口
2.1 信号量类型
#include <semaphore.h>
//原生线程库中的内容,编译时带-lpthread选项
sem_t sem;//同样为计数器
2.2 信号量的初始化与销毁
信号量的初始化:
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 参数1
sem_t*
: 传入需要初始化的信号量地址 - 参数2
int pshared
: 用于父子进程之间共享设置为1
,用于多线程之间共享设置为0
- 参数3
unsigned int value
: 用于设置信号量的大小 - 返回值
int
: 成功时,返回0;失败时,返回-1,并将错误码写入errno
信号量的销毁:
int sem_destroy(sem_t* sem);
- 参数
sem_t* sem
: 传入需要销毁的信号量的地址 - 返回值
int
: 成功时,返回0;失败时,返回-1,并将错误码写入errno
2.3 信号量的P申请、V释放操作
信号量的P操作:
int sem_wait(sem_t* sem);
- 参数
sem_t* sem
: 传入需要进行P操作的信号量地址 - 返回值
int
: 成功时,返回0;失败时,返回-1,并将错误码写入errno
信号量的V操作:
int sem_post(sem_t* sem);
- 参数
sem_t* sem
: 传入需要进行V操作的信号量地址 - 返回值
int
: 成功时,返回0;失败时,返回-1,并将错误码写入errno
信号量不仅仅其申请与释放操作都是互斥的,而且本身还同时具有计数的作用。因此,在功能上,其就相当于是互斥锁与条件变量的结合,但其又只需要一条操作语句就可以实现互斥锁与条件变量的配合效果。所以,在代码编写角度,信号量比互斥锁 + 条件变量
更简单也更简洁。
3. 基于环形队列实现p、c模型的设计方案
3.1 环形队列(ringqueue)作为共享资源缓冲区的设计要求
生产者生产数据存入队列,消费者消费数据从队列中获取,此处使用方案1中的环形队列设计方式。
- 生产者与消费者访问共享资源时,需要保持互斥且同步。
- 环形队列中有多块空间,只有队列为满或为空时,生产者与消费者才会访问到同一块空间,即访问共享资源。因此,除开队列为空为满的情况之外,其他场景中,消费者与生产者都可以并发地访问环形队列。
3.2 环形队列作为共享资源缓冲区的实现方法
- 对于生产者而言,其需要向队列中插入数据,所以,队列中空余的空间(room)是其所需的资源
- 对于消费者而言,其需要从队列中获取数据,所以,队列空间中存储的数据(data)是其所需的资源
定义两个信号量room_sem
与data_sem
,分别作为生产者资源与消费者资源的计数器,每次在生产者、消费者线程访问环形队列之前,都要预先申请对应的信号量资源。变量productor_index
记录生产者生产资源的放置位置,变量consumer_index
记录消费者从环形队列中获取数据的位置。生产者之间、消费者之间它们对于环形队列的访问需要保持互斥性,此处实现可以使用一把锁来实现,但这样生产者、消费者之间大多数情况下的并发就会被影响,因此,我们定义分别定义两把锁,来分别实现同类之间的互斥,彼此之间并发。
- 环形队列作为共享资源缓冲区的优势
队列中拥有多块空间,生产者与消费者线程大多数情况下都不会访问同一块资源,因此,可以保证它们之间的并发访问,提高程序的效率。
4. 代码实现
4.1 线程类
namespace ThreadModule
{template<typename T>using func_t = function<void(T&, string)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data, _name);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}
4.2 线程执行的任务
using task_t = function<void()>;void download()
{cout << "task is download" << endl;
}
4.3 RingQueue类
#ifndef RING_QUEUE
#define RING_QUEUE
#include <vector>
#include <semaphore.h>
#include <pthread.h>template<typename T>
class RingQueue
{
private:void P(sem_t* sem){sem_wait(sem);//wait申请}void V(sem_t* sem){sem_post(sem);//post单词译为放入,post操作为释放}void Lock(pthread_mutex_t* lock){pthread_mutex_lock(lock);}void Unlock(pthread_mutex_t* lock){pthread_mutex_unlock(lock);}public:RingQueue(int cap):_cap(cap), _ringbuffer(cap), _productor_index(0), _consumer_index(0){sem_init(&_room_sem, 0, _cap);//父子进程之间共享pshared: 1, 多线程之间共享pshared: 0sem_init(&_data_sem, 0, 0);pthread_mutex_init(&_productor_lock, nullptr);pthread_mutex_init(&_consumer_lock, nullptr);}void Enqueue(const T& data){P(&_room_sem);Lock(&_productor_lock);_ringbuffer[_productor_index++] = data;_productor_index %= _cap;Unlock(&_productor_lock);V(&_data_sem);}void Pop(T* data){P(&_data_sem);Lock(&_consumer_lock);*data = _ringbuffer[_consumer_index++];_consumer_index %= _cap;Unlock(&_consumer_lock);V(&_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_lock);pthread_mutex_destroy(&_consumer_lock);}private:vector<T> _ringbuffer;int _cap;int _productor_index;int _consumer_index;sem_t _room_sem;sem_t _data_sem;pthread_mutex_t _productor_lock;pthread_mutex_t _consumer_lock;
};#endif
4.4 主干代码
using ringqueue_t = RingQueue<task_t>;void ProductorRun(ringqueue_t& rq, string name)
{while(true){rq.Enqueue(download);cout << name << " is producted" << endl;}
}void ConsumerRun(ringqueue_t& rq, string name)
{while(true){sleep(1);task_t task;rq.Pop(&task);cout << name << " get : "; task();//bad_function_call: 尝试调用没有目标的function对象时会出现此异常}
}void InitComm(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num, func_t<ringqueue_t> func, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, rq, name);}
}void InitProductor(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num)
{InitComm(threads, rq, num, ProductorRun, "productor");
}void InitConsumer(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num)
{InitComm(threads, rq, num, ConsumerRun, "consumer");
}void StartAll(vector<Thread<ringqueue_t>>& threads)//vector必须用引用,存储线程tid
{for(auto& thread : threads){thread.start();}
}void WaitAllThread(vector<Thread<ringqueue_t>> threads)//尝试用拷贝对象是否能够正常回收进程
{for(auto thread : threads){thread.join();}
}int main()
{ringqueue_t rq(5);vector<Thread<ringqueue_t>> threads;InitProductor(threads, rq, 3);InitConsumer(threads, rq, 2);StartAll(threads);WaitAllThread(threads);return 0;
}
此处设计时,将创建线程类与真正创建并启动线程分开执行。这是因为直接以threads.back().start()
创建后调用启动线程,其中threads.back()
迭代器可能会由于不断向threads
中插入新的线程类对象,导致发生扩容,最终使得原迭代器失效。