原子操作
- 原子操作原理
- 什么是原子操作?
- 原子性
- 原子变量相关接口
- 内存序
- shared_ptr的实现
原子操作原理
什么是原子操作?
原子操作其实就是指在多线程的环境下,确保对共享变量的操作不会被干扰,从而避免了竞态条件。
我们都知道,在多线程环境下,对于共享变量的操作是存在线程安全的问题的,假设当前存在一个全局变量 count ,此时创建多个线程对 count 进行 ++ 操作,就会诱发线程安全的问题,对于 count 这种临界资源来说,表面上我们看着只有一句代码,但是解释成汇编语句他就会存在多个操作:
多线程环境下就存在竞态条件,多个线程去竞争 count 这一个临界资源,对于单条汇编语句,他的操作必定是原子性的,但是在多条汇编语句下,随着线程的切换, count 变量的操作就充满了不确定性。
而原子变量就可以有效的解决掉这个问题,他保证了对于 count 这个变量的操作,比如说现在有两个线程,线程 A 对 count 变量操作的时候,他的操作是不可被打断的,只有当线程 A 对 count 变量操作完成以后,线程 B 才会去获取到这个资源,进行他的操作,这就保证了 count 操作的原子性。
原子性
理解原子性,我们首先的从 CPU 的存储体系架构下手,在当前这个多处理器多核心工作的架构下,必定会存在 CPU 资源竞争的发生,因为我们的线程就是在核心上运行的,竞态条件的发生就会意味着切换,就会有线程安全的情况发生。
首先我们来了解一下CPU 的存储体系架构,如下图所示:
因为在当前时代下,CPU 的读取速度时远高于主存的,就出现了缓存(cache)的概念,我们可以理解为他是 CPU 与主存之间的一种存储结构,他的读取速度高于主存略低于 CPU ,主要分为 3 级,分为 L1,L2,L3,前两级 L1,L2 是 CPU 核心所独有,L3 是所有 CPU 核心共享的。
原子性就是指在当前核心上的操作的中间状态并不被其他核心所识别到,就如下图所示:
而我们有如何做到这种状态呢?
如果是单处理器单核心的情况下,就比较简单,因为就算是创建多个线程,每次也只会允许一个线程进行运转,主要我们屏蔽掉中断机制,使用底层自旋锁,我们就可以去做到这种原子性的操作,但是注意,这只是在单处理器单核心下,当前开发环境大多数都是在多处理器多核心的情况下。
在多处理器多核心的场景下,就会存在 CPU 资源被争夺的情况产生,要保证原子性,就需要保证其他的核心不去操作当前核心操作的内存空间,对于L1,L2缓存来说,当前核心独有,我们并不需要做过多的操作,但是对于 L3 缓存来说,是所有核心所共有的,在我们当前的工作模式下,有一种总线嗅探机制,就是最终的数据会通过总线传递给其他的核心。
以往 0x86 是通过锁总线的方式,避免所有内存的访问,但是这就会造成一个问题,就是效率低下的问题,只有当前核心可以工作,其他核心都被阻拦,明显效率就变低了。现在采用的是阻止其他核心对相关的内存空间访问。
CPU 如何读写数据
首先我们来看写策略,对于写策略,通常会存在以下两种处理方式:
- 直写:这种策略其实很简单,当 CPU 需要写入数据时,此时就会将数据同时都写入到 cache 和主存当中,也就意味着其他核心每一次读取到的数据都是最新的,就不会出现不同步的问题。
- 写回:当 CPU 需要写入数据时,此时并不是将数据立即就写入到主存当中,而是先写入缓存当中,缓存当中是以 cache line 去保存数据的,其中有一个标记位,我们写入数据以后,如果发生缓存命中,此时就可以直接写,如果没有命中,就会去缓存中定位一个数据块,将数据进行写入,同时标记标志位为 dirty 脏数据。
- 当我们在缓存中定位数据块是会出现两种情况,一种是缓存中所有的数据块都已经被标记,没有空闲的,此时就会就会通过 LRU 策略(最久没有被使用)去淘汰掉一个块(刷回主存中),然后将当前数据进行写入,另一种就是存在空闲的块,此时就会将数据写入,然后标记为脏数据。
- 注意, LRU 策略下并不是将数据清除掉,而是回检查当前数据块标志位是否为 dirty ,如果是 dirty ,就会将其刷入主存当中,也就是说这种场景下,我们进行写操作,本身我们写入的是 i 的数据,但是当前我们 LRU 需要去替换掉的是 g 的 cache line,并且当前 cache line 的标记位本身为 dirty,就会将 g 的数据先刷新到主存当中,然后在写入 i 的数据,这就是写回策略。
接下来看读策略:
- 读策略相对来说更加容易理解,CPU 在进行数据读取时,首先会去缓存中读取,如果命中,就会直接读取到,如果没有命中,此时也是需要定位缓存块的,如果定位到非脏的缓存块,就直接从主存中将该数据刷入到缓存块当中,标记为脏数据;
- 如果未命中,同样的道理,如果当前 cache line 标记位为脏数据,会先将当前 cache line 的数据刷新会主存,然后从主存中读取我们需要的数据到当前缓存中,标记为非脏数据,因为我们当前并没有写操作发生。
缓存不一致问题
了解了 CPU 读写数据的操作以后,我们不难发现,在多核多处理器下就会出现缓存不一致的问题,试想一下,有一个多核心的 CPU,其中核心 A 和核心 B 都需要访问内存中的同一个数据 X 。一开始,数据 X 被加载到核心 A 和核心 B 各自的缓存中 。当核心 A 对缓存中的数据 X 进行修改时,此时核心 A 缓存中的数据 X 已经更新,但是有 CPU 写回策略的存在,核心 B 当中数据依然是旧的,此时核心 B 进行读操作,读取的就是旧的数据,那么就会出现缓存不一致的问题。
基于写回策略的影响,就出现了缓存一致性的问题,在设计的过程中为了避免这种问题,就出现了我们最开始所说的总线嗅探方案,每个 CPU 核心都与总线相连接,总线上一旦发生一些变化,就会被 CPU 嗅探到,当一个CPU核心对自己缓存中的数据进行写操作时,它会向总线发送一个写请求,这个请求包含了被修改数据的地址等信息 。
但是这种方式的问题就在于,写操作是以广播的方式发送出去的,每个核心都可以收到,有一些核心并不需要当前的数据,他也会收到,这样就会造成带宽的压力,所以,在这儿也进行了优化,确保我们当前的修改只有需要的核心会去访问到,不需要的完全可以不用去管,这样就减少了带宽的压力。
当然,在这种总线嗅探机制下,就会存在数据接收快慢的问题,因为数据的传输是存在顺序的,有的核心离得近,有的核心离得远,我们必须保证当前的数据修改被每一个需要核心所接收到,并且在这个过程中数据并不会被更改,在这块儿就会存在一个 lock 指令让其基于嗅探机制实现事务的串行化,保证每一个核心所接收到的都是最新的数据。
缓存一致性协议 MESI
MESI 协议是一个基于失效的缓存一致性协议,支持 write-back 写回缓存的常用协议。
主要原理:通过总线嗅探策略(将读写请求通过总线广播给所有核心,核心根据本地状态进行响应)。
MESI 协议存在四种状态:
- Modified(M):某数据已修改但是没有同步到内存中。如果其他核心要读该数据,需要将该数据从缓存同步到内存中,并将状态转为 ;
- Exclusive(E):某数据只在该核心当中,此时缓存和内存中的数据一致;
- Shared(S):某数据在多个核心中,此时缓存和内存中的数据一致;
- Invaliddate(I):某数据在该核心中以失效,不是最新数据。
通过以上的知识介绍,我们就可以理解原子性以及原子变量的原理有一个初步的了解,接下来我们来看一下关于原子变量的一些接口调用:
原子变量相关接口
store(T desired, std::memory_order order)
:用于将指定的值存储到原子对象中;load(std::memory_order order)
:用于获取原子变量的当前值;exchange(std::atomic<T>* obj, T desired)
:访问和修改包含的值,将包含的值替换并返回它前面的值。如果替换成功,则返回原来的值。compare_exchange_weak(T& expected, T val, memory_order success, memory_order failure)
:比较一个值和一个期望值是否相等,如果相等则将该值替换成一个新值,并返回 true;否则不做任何操作并返回 false。注意,compare_exchange_weak
函数是一个弱化版本的原子操作函数,因为在某些平台上它可能会失败并重试。如果需要保证严格的原子性,则应该使用 compare_exchange_strong 函数。fetch_add,fetch_sub,fetch_and,fetch_or,fetch_xor
:这都是一些基于运算的操作。
这些函数仔细观察都会存在一个特点,我们会发现存在一个 mem_order ,这个代码的就是内存序,内存序定义了原子操作之间的可见性关系和顺序约束,直接影响程序的线程安全性。
内存序
我们平时的代码都点我们的逻辑顺序去写的,但是对于 CPU 和编译器来说,是会对代码进行优化的,存在指令重排,比说下面这段代码:
int main()
{int i = 0;int j = 1;i += 1;j += 2;i += 3;return 0;
}
对于 CPU 和编译器来说,他是可以不按照我们的代码逻辑来进行加载的,比如说上面对于 i 的操作就会加载在一起,然后先操作,然后再加载对 j 的操作,这也是 CPU 的局部性原理所决定的,这样的优化可以去提高程序的运行效率。
在多线程的环境下,CPU 的切换就会破坏逻辑,我们会进行加锁操作,而我们的加锁操作就是在控制内存序,让 CPU 和编译器不再去进行优化,这也就能解释为什么频繁的锁操作会影响程序的运行效率,因为这样并不会让 CPU 和编译器对程序进行优化。
内存序规定了多个线程访问同一个地址时的语义,他决定了某个线程对内存的操作何时能被其他线程所看见,某个线程对内存附近的访问可以做到怎样的优化。
C++ 标准定义了 6 中内存序:
memory_order_relaxed
memory_order_relaxed 所代表是松散内存序,只用来保证对原子对象的操作是原子的,在不需要保证顺序时使用,这也就意味着他只保证当前的操作是原子的,对于代码逻辑随便 CPU 和编译器怎么去优化都可以,我只要保证在我操作时其他线程不会操作就可以了,就如下图所示:
他就是值当前操作前面的代码是可以优化到后面去的,后面的代码也是可以优化到前面去的,但是当前不确定性就增加了,效率却是最高的。
#include <atomic>
#include <thread>
#include <iostream>std::atomic<int> x{0};void thread_func1()
{for (int i = 0; i < 100000; ++i){x.store(i, std::memory_order_relaxed);}
}void thread_func2()
{for (int i = 0; i < 100000; ++i){x.store(-i, std::memory_order_relaxed);}
}int main()
{std::thread t1(thread_func1);std::thread t2(thread_func2);t1.join();t2.join();std::cout << "Final value of x = " << x.load(std::memory_order_relaxed) << std::endl;return 0;
}
这段代码我们就可以确定,结果要么就是 -9999 要么就是 9999 ,不会存在其他的值,只是线程的调度顺序而已,因为当前操作是具有原子性的。
memory_order_release
memory_order_release 代表的是一个释放操作,在写入某原子对象时,当前线程的任何前面的读写操作都不允许重排到这个操作的后面去,并且当前线程的所有内存写入都在对同一个原子对象进行获取的其他线程可见;通常与 memory_order_acquire 或memory_order_consume 配对使用。
也就是说当前操作前面的操作可以任意优化,后面的操作也可以优化到前面来,但是当前操作前面的操作是不可以优化到后面去的,而且当前操作完成以后,数据同时会被刷新到 cache 和主存当中去,其他线程所读取的就会是最新的数据。
memory_order_acquire
memory_order_acquire 是一种获得操作,在读取某原子对象时,当前线程的任何后面的读写操作都不允许重排到这个操作的前面去,并且其他线程在对同一个原子对象释放之前的所有内存写入都在当前线程可见。
也就是说当前操作前面的操作是可以优化到后面去的,但是当前操作后面的操作不可以优化到前面去,我们会先从主存当中去读取数据,读值缓存当中,我们每一次读取到的数据都是最新的,通常与 memory_order_release 一起使用。
#include <atomic>
#include <thread>
#include <assert.h>
#include <iostream>std::atomic<bool> x,y;
std::atomic<int> z;// 提升效率
void write_x_then_y()
{x.store(true,std::memory_order_relaxed); // 1 y.store(true,std::memory_order_release); // 2
}void read_y_then_x()
{while(!y.load(std::memory_order_acquire)); // 3 自旋,等待y被设置为trueif(x.load(std::memory_order_relaxed)) // 4++z; // 会不会一定等于 1
}int main()
{x=false;y=false;z=0;std::thread a(write_x_then_y);std::thread b(read_y_then_x);a.join();b.join();std::cout << z.load(std::memory_order_relaxed) << std::endl;return 0;
}
我们看上面这段代码,打印的结果一定是 1 ,原因就在于 3 操作必须等 2 操作完成以后才会结束,对于 3 操作来说,不会允许后面的操作被优化到她的前面,而对于 2 操作来说,不会允许它前面的操作被优化到后面去,也就是说完成 2 操作以后,1 操作必定被完成了,最终只可能是 1 。
memory_order_acq_rel
memory_order_acq_rel 是一个获得释放操作,一个读‐修改‐写操作同时具有获得语义和释放语义,即它前后的任何读写操作都不允许重排,并且其他线程在对同一个原子对象释放之前的所有内存写入都在当前线程可见,当前线程的所有内存写入都在对同一个原子对象进行获取的其他线程可见。
memory_order_seq_cst
顺序一致性语义,对于读操作相当于获得,对于写操作相当于释放,对于读‐修改‐写操作相当于获得释放,是所有原子操作的默认内存序,并且会对所有使用此模型的原子操作建立一个全局顺序,保证了多个原子变量的操作在所有线程里观察到的操作顺序相同,当然它是最慢的同步模型。
shared_ptr的实现
在我们的 C++11 中有很多地方会用到 shared_ptr , shared_ptr 很好的解决了我们内存泄漏的问题,它使用的是 RAII 的方法,又引入了引用计数的概念,支持多个对象去管理一个资源,引用计数控制着当前资源被多少个对象所持有,某个对象被销毁时引用计数就进行--
操作,当引用计数为 0 时,就意味着该资源需要被释放了。
shared_ptr 我们也是可以通过原子变量去进行实现的,在 shared_ptr 的内部存在对引用计数的 ++
和 --
操作,就必定需要保证线程安全的问题,就需要加锁,但是使用原子变量的话可以是更优选,相对于加锁来说,原子变量的效率更高。
我们要实现一个 shared_ptr ,就需要实现以下接口:
- 构造函数;
- 析构函数;
- 拷贝构造函数;
- 拷贝赋值运算符;
- 移动构造函数;
- 解引用箭头运算符;
- 引用计数、原始指针、重置指针。
#ifndef __SHARED_PTR__
#define __SHARED_PTR__#include <iostream>
#include <atomic>template <typename T>
class Shared_ptr
{
public:// 智能指针是支持构造一个空对象的Shared_ptr() : ptr_(nullptr), count_(nullptr) {}// 构造函数,explicit修饰,防止被直接赋值explicit Shared_ptr(T *ptr) : ptr_(ptr), count_(ptr ? new std::atomic<std::size_t>(1) : nullptr){}// 析构函数~Shared_ptr(){release();}// 拷贝构造函数Shared_ptr(const Shared_ptr<T> &other) : ptr_(other.ptr_), count_(other.count_){if (count_){count_->fetch_add(1);}}// 赋值运算符重载, 需要注意,防止自己给自己拷贝Shared_ptr<T> &operator=(const Shared_ptr<T> &other){if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;if (count_){count_->fetch_add(1);}}return this;}// 移动构造函数// 使用 noexcept 修饰,代表当前函数不会存在异常,编译器会生成高效代码Shared_ptr(const Shared_ptr<T> &&other) noexcept : ptr_(other.ptr_), count_(other.count_){other.ptr_ = nullptr;other.count_ = nullptr;}// 移动运算符重载Shared_ptr<T> &operator=(const Shared_ptr<T> &&other) noexcept{if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;other.ptr_ = nullptr;other.count_ = nullptr;}return this;}// 解引用T &operator*() const{return *ptr_;}// ->T *operator->() const{return ptr;}// 获取到引用计数size_t usecount() const{return count_ ? count_->load() : 0;}// 获取裸指针T *get() const{return ptr_;}// 重置函数void reset(T *ptr = nullptr){release();ptr_ = ptr;count_ = ptr ? new std::atomic<std::size_t>(1) : 0;}private:void release(){if (count_ && count_->fetch_sub(1) == 1){delete ptr_;delete count_;}}private:std::atomic<int> *count_; // 引用计数T *ptr_;
};#endif
关于智能指针的原理可以参考之前的一篇文章:C++11之智能指针,这儿只是换了另一种方式去进行实现,没有考虑定制删除器与弱引用的问题,同样,上面的操作也没有对内存序进行设置,默认就是 memory_order_seq_cst,接下来我们来进行一下优化。
代码中使用 fetch_add 的地方有两处,这两处位置使用 memory_order_relaxed 最为合适,因为我们只需要保证这个操作是原子性的即可,保证最终的值不会被影响,而 fetch_sub 使用 memory_order_acq_rel 最为合适,因为我们需要保证当前返回的是操作的原始值,我们必须保证所有操作不被重排,维持我们原始的代码逻辑,而且我们也需要保证我们的变量是在为 0 时才进行释放的。
所以最终代码优化如下:
#ifndef __SHARED_PTR__
#define __SHARED_PTR__#include <iostream>
#include <atomic>template <typename T>
class Shared_ptr
{
public:// 智能指针是支持构造一个空对象的Shared_ptr() : ptr_(nullptr), count_(nullptr) {}// 构造函数,explicit修饰,防止被直接赋值explicit Shared_ptr(T *ptr) : ptr_(ptr), count_(ptr ? new std::atomic<std::size_t>(1) : nullptr){}// 析构函数~Shared_ptr(){release();}// 拷贝构造函数Shared_ptr(const Shared_ptr<T> &other) : ptr_(other.ptr_), count_(other.count_){if (count_){count_->fetch_add(1, std::memory_order_relaxed);}}// 赋值运算符重载, 需要注意,防止自己给自己拷贝Shared_ptr<T> &operator=(const Shared_ptr<T> &other){if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;if (count_){count_->fetch_add(1, std::memory_order_relaxed);}}return *this;}// 移动构造函数// 使用 noexcept 修饰,代表当前函数不会存在异常,编译器会生成高效代码Shared_ptr<T>(const Shared_ptr<T> &&other) noexcept : ptr_(other.ptr_), count_(other.count_){other.ptr_ = nullptr;other.count_ = nullptr;}// 移动运算符重载Shared_ptr<T> &operator=(const Shared_ptr<T> &&other) noexcept{if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;other.ptr_ = nullptr;other.count_ = nullptr;}return *this;}// 解引用T &operator*() const{return *ptr_;}// ->T *operator->() const{return ptr_;}// 获取到引用计数size_t usecount() const{return count_ ? count_->load(std::memory_order_acquire) : 0;}// 获取裸指针T *get() const{return ptr_;}// 重置函数void reset(T *ptr = nullptr){release();ptr_ = ptr;count_ = ptr ? new std::atomic<std::size_t>(1) : 0;}private:void release(){if (count_ && count_->fetch_sub(1, std::memory_order_acq_rel) == 1){delete ptr_;delete count_;}}private:std::atomic<std::size_t> *count_; // 引用计数T *ptr_;
};#endif
测试是否是线程安全的:
#include <iostream>
#include "shared_ptr.h"
#include <thread>
#include <vector>
#include <chrono>
#include <memory>void test_shared_ptr_thread_safety() {Shared_ptr<int> ptr(new int(42));// 创建多个线程,每个线程都增加和减少引用计数const int num_threads = 10;std::vector<std::thread> threads;for (int i = 0; i < num_threads; ++i) {threads.emplace_back([&ptr]() {for (int j = 0; j < 1000; ++j) {Shared_ptr<int> local_ptr(ptr);// 短暂暂停,增加线程切换的可能性std::this_thread::sleep_for(std::chrono::milliseconds(1));}});}// 等待所有线程完成for (auto& thread : threads) {thread.join();}// 检查引用计数是否正确std::cout << "use_count: " << ptr.usecount() << std::endl;if (ptr.usecount() == 1) {std::cout << "Test passed: shared_ptr is thread-safe!" << std::endl;} else {std::cout << "Test failed: shared_ptr is not thread-safe!" << std::endl;}
}// 测试代码
int main() {Shared_ptr<int> ptr1(new int(10));std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 1{Shared_ptr<int> ptr2 = ptr1;std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 2std::cout << "ptr2 use_count: " << ptr2.usecount() << std::endl; // 2}std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 1Shared_ptr<int> ptr3(new int(20));ptr1 = ptr3;std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 2std::cout << "ptr3 use_count: " << ptr3.usecount() << std::endl; // 2ptr1.reset();std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 0std::cout << "ptr3 use_count: " << ptr3.usecount() << std::endl; // 1test_shared_ptr_thread_safety();return 0;
}
最终结果如下: