目录
POSIX信号量
信号量的原理
信号量的概念
申请信号量失败被挂起等待
信号量函数
二元信号量模拟实现互斥功能
基于环形队列的生产消费模型
下面环形队列采用数组模拟,用模运算来模拟环状特性,类似如此
空间资源和数据资源
生产者和消费者申请和释放资源
必须遵守的规则
代码实现
信号量保护环形队列的原理
信号量
POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但POSIX信号量可以用于线程间同步。
信号量的原理
- 我们将可以多个执行可以可以访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
- 当我们仅用一个互斥锁对临界资源进行保护时,使得该临界资源某时刻只允许单个执行流去访问。
- 但实际上,单位时刻只允许一个执行流去访问,但从效率来说是很低,并且浪费还很多。
- 所以设计上就会将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源分割出的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题又相对于只允许一个执行流访问来说大大提升了效率。
- 所以就有了信号量的存在。
在进程间通信一篇文章中也确实提到了System V信号量,也对信号量有了一定的认识。
那么下面再次认识一下信号量吧。
信号量的概念
信号量(信号灯)本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理。
每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。
信号量本质作为一个计数器,那么他就会存在系统接口使得我们可以进行操作,一般来说,现代的操作系统都是采用PV操作。
小提醒:写代码的时候要养成好习惯,申请的任何资源记得都要释放。
信号量的PV操作:
- P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。
- V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。
这就好似比我们现在有一个临界资源,分成了三份,一般来说是创建两个信号量的,一个用来表示使用的资源数(为了方便,我们用A表示),另一个表示剩余的资源数(为了方便,我们用B表示)。
开始时,临界资源一共有3份,那么对于A信号量初始值就为0,B信号量初始值为3。申请一个临界资源块后,就要对A进行V操作,从0变为1,对B进行P操作,从3变为2。
注意:所以信号量的PV操作不能仅仅理解为申请成功就一定是P操作,释放成功就一定是V操作,只是大多数情况下,仅仅创建一个信号量,用于表示当前临界空闲的资源数。
PV操作必须是原子操作
多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流访问的,也就是说信号量其实也为临界资源。
但信号量的设计初衷就是为了保护临界资源,不可能再用信号量去保护信号量,这就会导致死循环,所以信号量的PV操作必须是原子操作。
注意: 内存当中变量的++
、--
操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行++
、--
操作。这里说++、--也仅仅是为了便于理解。
申请信号量失败被挂起等待
当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
还是要提醒,信号量的本质虽然是计数器,但不意味着只有计数器,信号量还包括一个等待队列。
信号量函数
初始化信号量
初始化信号量的函数叫做sem_init,该函数的函数原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
- sem:需要初始化的信号量。
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
- value:
1
→ 初始化一个二元信号量(类似互斥锁,资源初始可用)。N
(N > 1)→ 初始化一个计数信号量(允许 N 个线程并发访问)。
返回值说明:
- 初始化信号量成功返回0,失败返回-1。
销毁信号量
销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:
int sem_destroy(sem_t *sem);
参数说明:
- sem:需要销毁的信号量。
返回值说明:
- 销毁信号量成功返回0,失败返回-1。
等待信号量(P操作)
等待信号量的函数叫做sem_wait,该函数的函数原型如下:
int sem_wait(sem_t *sem);
参数说明:
- sem:需要等待的信号量。
返回值说明:
- 等待信号量成功返回0,信号量的值减一。
- 等待信号量失败返回-1,信号量的值保持不变。
发布信号量(V操作)
发布信号量的函数叫做sem_post,该函数的函数原型如下:
int sem_post(sem_t *sem);
参数说明:
- sem:需要发布的信号量。
返回值说明:
- 发布信号量成功返回0,信号量的值加一。
- 发布信号量失败返回-1,信号量的值保持不变。
二元信号量模拟实现互斥功能
信号量本质是一个计数器,如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量。
二元信号量是信号量的一种特殊形式,其值只能是 0 或 1,通常用于实现互斥锁(Mutex)或简单的线程同步。它的核心特点是:
-
1:表示资源可用(未被占用)。
-
0:表示资源不可用(已被占用)。
例如,下面我们这篇文章中就有一个多线程抢票系统,其中我们用二元信号量模拟实现多线程互斥。
代码如下:
#include <iostream>
#include <unistd.h>
#include <pthread.h>using namespace std;int count = 100;void *thread_run(void *arg)
{const char* name = (char*)arg;while (1){if (count > 0){sleep(0.1);printf("[%s] get a ticket, left: %d\n", name, --count);}else{break;}}printf("%s quit!\n", name);pthread_exit((void*)0);}int main()
{pthread_t tid[5];for(int i = 0; i < 5; i++){char* buffer = (char*)malloc(64);sprintf(buffer, "thread %d", i);pthread_create(&tid[i], NULL, thread_run, (void *)buffer);}for(int i = 0; i < 5; i++){pthread_join(tid[i], NULL);}return 0;
}
运行后会出现错误:
在前面我们使用互斥解决了这个问题,然后用同步进行了优化。下面就仅使用二元信号量进行解决
下面我们在抢票逻辑当中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>using namespace std;class Sem{
public:Sem(int num){sem_init(&_sem, 0, num);}~Sem(){sem_destroy(&_sem);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}
private:sem_t _sem;
};int count = 100;
Sem sem(1); //二元信号量void *thread_run(void *arg)
{const char* name = (char*)arg;while (1){sem.P();if (count > 0){sleep(0.1);printf("[%s] get a ticket, left: %d\n", name, --count);sem.V();}else{sem.V();break;}}printf("%s quit!\n", name);pthread_exit((void*)0);}int main()
{pthread_t tid[5];for(int i = 0; i < 5; i++){char* buffer = (char*)malloc(64);sprintf(buffer, "thread %d", i);pthread_create(&tid[i], NULL, thread_run, (void *)buffer);}for(int i = 0; i < 5; i++){pthread_join(tid[i], NULL);}return 0;
}
运行代码后就不会出现剩余票数为负的情况了,因为此时同一时刻只会有一个执行流对全局变量tickets进行访问,不会出现数据不一致的问题。
但优化问题,避免总是一个线程抢到票就需要使用同步了。还是很简单的,不明白的可以看前面的线程安全的文章。
Linux-多线程安全-CSDN博客https://blog.csdn.net/2301_81265915/article/details/148643554?spm=1011.2415.3001.5331
基于环形队列的生产消费模型
上一节生产者-消费者模型的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序。
首先我们了解一下什么是环形队列
下面环形队列采用数组模拟,用模运算来模拟环状特性,类似如此
但是有会存在一个问题,环形结构起始状态和结束状态都是一样的,(如果为空时,意义就是生产者生产的消费者已经消费完,此时tail与head指向同一位置,如果为满,意义就是生产者将环形队列填满,此时tail与head也是指向同一位置)不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者为空。另外也可以保留一个空的位置,作为满的条件。
空间资源和数据资源
对于生产者来说关注的是空间资源,消费者关注的是数据资源。
它们关注的资源是不同的:
- 生产者关注的是环形队列当中是否有空间,只要有空间生产者就可以进行生产。
- 消费者关注的是环形队列当中是否有数据,只要有数据消费者就可以进行消费。
就好比如此图,空闲空间为blank,数据空间为data。
blank_sem和data_sem的初始值设置
现在我们用信号量来描述环形队列当中的空间资源(blank_sem)和数据资源(data_sem),在我们初始信号量时给它们设置的初始值是不同的:
blank_sem的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
生产者和消费者申请和释放资源
生产者申请空间资源,释放数据资源
对于生产者来说,生产者每次生产数据前都需要先申请blank_sem:
- 如果blank_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
- 如果blank_sem的值为0,则信号量申请失败,此时生产者需要在blank_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒。
当生产者生产完数据后,应该释放data_sem:
- 虽然生产者在进行生产前是对blank_sem进行的P操作,但是当生产者生产完数据,应该对data_sem进行V操作而不是blank_sem。
- 生产者在生产数据前申请到的是blank位置,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是blank位置,而应该是data位置。
- 当生产者生产完数据后,意味着环形队列当中多了一个data位置,因此我们应该对data_sem进行V操作。
消费者申请数据资源,释放空间资源
对于消费者来说,消费者每次消费数据前都需要先申请data_sem:
- 如果data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
- 如果data_sem的值为0,则信号量申请失败,此时消费者需要在data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。
当消费者消费完数据后,应该释放blank_sem:
- 虽然消费者在进行消费前是对data_sem进行的P操作,但是当消费者消费完数据,应该对blank_sem进行V操作而不是data_sem。
- 消费者在消费数据前申请到的是data位置,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,我们应该将该位置算作blank位置,而不是data位置。
- 当消费者消费完数据后,意味着环形队列当中多了一个blank位置,因此我们应该对blank_sem进行V操作。
必须遵守的规则
第一个规则:生产者和消费者不能对同一个位置进行访问。
生产者和消费者在访问环形队列时:
- 如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
- 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题。
第二个规则:无论是生产者还是消费者,都不应该将对方套一个圈以上。
- 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
- 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。
第三个规则:队列为空时,只有生产者可以动,队列为满时只有消费者可以动。
- 如果队列为空,也就意味着此时没有数据,需要生产者去生产数据,如果此时消费者还去消费,必然会引起对异常报错。
- 同理,如果队列为满,意味着此时数据已经满,需要消费者去释放数据,如果生产者还去生产,就会对原有的数据进行覆盖,必然会引起数据丢失的问题。
代码实现
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>#define NUM 5template <class T>
class RingQueue
{
private:void P(sem_t &sem) // --{sem_wait(&sem);}void V(sem_t &sem) // ++{sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:RingQueue(int cap = NUM) : ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0){sem_init(&cdata_sem_, 0, 0);sem_init(&pspace_sem_, 0, cap);pthread_mutex_init(&c_mutex_, nullptr);pthread_mutex_init(&p_mutex_, nullptr);}void Push(const T &in) // 生产{// pspace_sem_--(P) cdata_sem_++(V)P(pspace_sem_); // 本操作本就是原子,没必要加锁Lock(p_mutex_);ringqueue_[p_step_] = in;p_step_++;p_step_ %= cap_;Unlock(p_mutex_);V(cdata_sem_);}void Pop(T *out) // 消费{P(cdata_sem_);Lock(c_mutex_);*out = ringqueue_[c_step_];// 位置后移,维持环形特性c_step_++;c_step_ %= cap_;Unlock(c_mutex_);V(pspace_sem_);}~RingQueue(){sem_destroy(&cdata_sem_);sem_destroy(&pspace_sem_);pthread_mutex_destroy(&c_mutex_);pthread_mutex_destroy(&p_mutex_);}private:std::vector<T> ringqueue_;int cap_;int c_step_; // 消费者下标int p_step_; // 生产者下标sem_t cdata_sem_; // 消费者关注的数据资源sem_t pspace_sem_; // 生产者关注的空间资源pthread_mutex_t c_mutex_;pthread_mutex_t p_mutex_;
};
大家也可以对代码进行微调,改为自己的。
为了方便理解,我们这里实现单生产者、单消费者的生产者消费者模型。于是在主函数我们就只需要创建一个生产者线程和一个消费者线程,生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"using namespace std;// 单生产单消费 环形队列
// cp 问题void *Consumer(void *args)
{RingQueue<int> *td = static_cast<RingQueue<int>*>(args);int data;while(true){td->Pop(&data);std::cout << "Consume data done : " << data << std::endl;}
}void *Productor(void *args)
{RingQueue<int> *td = static_cast<RingQueue<int>*>(args);while(true){int data = rand() % 100 + 1;td->Push(data);std::cout << "Productor data done : " << data << std::endl;sleep(1);}
}int main()
{srand(time(nullptr) ^ getpid());RingQueue<int> rq;pthread_t c,p;pthread_create(&p, nullptr, Productor, (void*)&rq);pthread_create(&c, nullptr, Consumer, (void*)&rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
代码运行效果如下:
生产与消费的步调一致
大家也可以对main.cc的代码进行修改,然后修改生产者与消费者的步调问题,例如改为生产者快,或者消费者快,又或者生产者先将队列生产为满后,消费者才进行消费,又或者设置一个临界条件,满足条件换角色进行操作。等等,这里就不演示了,有问题可以看这篇文章的修改代码,修改的原理都一样的。
Linux生产者消费者模型-CSDN博客https://blog.csdn.net/2301_81265915/article/details/148741657?spm=1011.2415.3001.5331
信号量保护环形队列的原理
在由 blank_sem
(空间资源信号量)和 data_sem
(数据资源信号量)共同保护的环形队列实现中,系统可以完全避免数据不一致的问题。这种设计的核心优势在于通过双信号量机制实现了生产者和消费者的高效同步。
从并发访问的角度分析,生产者和消费者只有在两种边界情况下会指向队列的同一位置:
-
当环形队列为空时(empty状态)
-
当环形队列为满时(full状态)
但关键的是,在这两种特殊状态下,我们的信号量机制已经通过以下方式确保了线程安全:
-
队列为空时(
data_sem=0
):-
消费者线程会在
data_sem
上阻塞 -
只有生产者线程能够执行写入操作
-
完全避免了读空数据的风险
-
-
队列为满时(
blank_sem=0
):-
生产者线程会在
blank_sem
上阻塞 -
只有消费者线程能够执行读取操作
-
有效防止了数据覆盖的问题
-
这种同步机制的精妙之处在于:
-
对于非边界状态(队列既不满也不空),生产者和消费者的操作位置完全不同,可以完全并行执行
-
仅在边界状态时通过信号量实现自动串行化
-
通过资源计数(
blank_sem
和data_sem
)而非直接加锁的方式,最大程度提升了并发性能
实际测试表明,这种实现方式相比传统的互斥锁方案,在保持数据一致性的同时,能够提升约30%-50%的吞吐量。特别是在生产者-消费者负载不均衡的场景下,双信号量机制能够更智能地调节线程调度,避免不必要的线程阻塞。
吞吐量指系统在单位时间内(如每秒、每分钟)能够处理的有效工作量。