CPP——并发编程(二)atomic

上一篇文章介绍了线程thread,这一次我们介绍atomic,也就是原子操作。

了解并发编程的都知道原子操作的重要性,因为有些步骤是不能打断的。比如下面两个函数,第一个函数老公赚了100元,如果这个家有1000块了,标识这个家为富翁(人穷志短呀!)。第二个函数老婆花钱,如果资产不到1000元,标识这个家为普通家庭。

1
2
3
4
5
6
7
8
9
10
11
12
/*enum rich, ordinary, and asset_value is a global variable*/
void income(int n)
{
asset_value += n;
if(asset_value>1000) identity = rich;
}

void outcome(int n)
{
asset_value -=n;
if(asset_value<1000) identity = ordinary;
}

下面情况是有可能发生的,老公挣钱的时候,老婆在花钱。这时候,就会并发执行income和outcome两个函数,如果income执行的一半,恰好asset_value满足了条件,但是还没执行变rich的操作,线程调度到了outcome,使得asset_value又变得不满足条件了。执行完了outcome,回到income,因为已经进行过判断了,所以会直接将identity设置为rich,这就导致了错误。类似的情况还有很多。

因此我们需要将income,outcome都设定为原子操作。

原子类型是封装了一个值的类型,该值的访问保证不会导致数据争用,并且可以用于同步不同线程之间的内存访问。对于atomic头文件包含了两个类:atomic和atomic_flag。

atomic_flag

atomic_flag是一种简单原子布尔类型,只支持两种操作,test_and_set和clear。它的构造函数如下:

1
2
atomic_flag() noexcept = default;
atomic_flag (const atomic_flag&T) = delete;

它只有默认构造函数,而且拷贝构造函数被禁用,也不会有移动构造。但是,atomic_flag却可以用一个常量ATOMIC_FLAG_INIT来指定。

test_and_set()函数检查并且设置flag的值,返回一个布尔值,如果flag是true,则返回true,如果flag为false,则设置flag为true,返回false。它的这两个操作都是原子操作。

clear()函数简单设置flag为false。

如何使用这两个函数?比如某个资源某一刻只能让一个线程访问,那么可以这样安排:使用flag来标识资源是否正在被利用,用test_and_set()来阻塞访问。利用资源的线程结束之后,使用clear释放资源。下面是一个官方的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// atomic_flag as a spinning lock
#include <iostream> // std::cout
#include <atomic> // std::atomic_flag
#include <thread> // std::thread
#include <vector> // std::vector
#include <sstream> // std::stringstream

std::atomic_flag lock_stream = ATOMIC_FLAG_INIT;
std::stringstream stream;

void append_number(int x) {
while (lock_stream.test_and_set()) {}
stream << "thread #" << x << '\n';
lock_stream.clear();
}

int main ()
{
std::vector<std::thread> threads;
for (int i=1; i<=10; ++i) threads.push_back(std::thread(append_number,i));
for (auto& th : threads) th.join();

std::cout << stream.str();
return 0;
}

观察上述代码,如果没有lock_stream,那么多个线程同时运行,可能无法正常输出“thread # x”这种情况,而可能非常混乱。使用atomic_flag就可以对append_number加锁,使得输出变得正常。下面是运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//right
thread #1
thread #2
thread #3
thread #4
thread #5
thread #6
thread #7
thread #8
thread #9
thread #10
//wrong
thread #1
threathread #3
thread #4
#2
thread #5
thread #6
thread #7
thread #8thread #9
thread #10

d

在错误的情况下会运行出来什么样的结果是未定义的。

atomic

atomic类相对于atomic_flag来说复杂很多。它是一个模板类,一个模板类型为T的原子对象中封装了一个T的值。它的作用和atomic_flag类似,保证对于T类型的该值的访问都是原子操作,而不会引起数据竞争。

C++11标准中,对于atomic模板类的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template < class T > struct atomic {
bool is_lock_free() const volatile;
bool is_lock_free() const;
void store(T, memory_order = memory_order_seq_cst) volatile;
void store(T, memory_order = memory_order_seq_cst);
T load(memory_order = memory_order_seq_cst) const volatile;
T load(memory_order = memory_order_seq_cst) const;
operator T() const volatile;
operator T() const;
T exchange(T, memory_order = memory_order_seq_cst) volatile;
T exchange(T, memory_order = memory_order_seq_cst);
bool compare_exchange_weak(T &, T, memory_order, memory_order) volatile;
bool compare_exchange_weak(T &, T, memory_order, memory_order);
bool compare_exchange_strong(T &, T, memory_order, memory_order) volatile;
bool compare_exchange_strong(T &, T, memory_order, memory_order);
bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst) volatile;
bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst);
bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst) volatile;
bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst);
atomic() = default;
constexpr atomic(T);
atomic(const atomic &) = delete;
atomic & operator=(const atomic &) = delete;
atomic & operator=(const atomic &) volatile = delete;
T operator=(T) volatile;
T operator=(T);
};

此外,atomic对于整型(integral)和指针类型有特殊实现。整型包含了:char, signed char, unsigned char, short, unsigned short, int, unsigned int, long, unsigned long, long long, unsigned long long, char16_t, char32_t, wchar_t。对于这些类型实现了下面几个成员函数:

1
2
3
4
5
6
7
8
atomic::fetch_add
atomic::fetch_sub
atomic::fetch_and
atomic::fetch_or
atomic::fetch_xor
atomic::operator++
atomic::operator--
operator (comp. assign.)

对于指针类型实现了下面的函数:

1
2
3
4
5
atomic::fetch_add
atomic::fetch_sub
atomic::operator++
atomic::operator--
operator (comp. assign.)

其中operator包含了很多其他如+=,-=等operator函数的实现,简化为operator(comp. assign.)(了解更多点击这个链接)。

atomic的构造函数包括默认构造和初始化,它的拷贝构造函数被删除了,也没有移动构造。
类型|函数
—-|—-
default (1)|atomic() noexcept = default;
initialization (2)|constexpr atomic (T val) noexcept;
copy [deleted] (3)|atomic (const atomic&) = delete;

既然拷贝构造函数被禁用了,那么对于赋值也是一样。因为赋值实际上与拷贝和移动构造差别不大,只不过不是构造函数。但是,atomic对于赋值操作operator=有别的定义:

1
2
T operator= (T val) noexcept;
T operator= (T val) volatile noexcept;

通过这个可以设定atomic对象封装的T的值,因此它是set value,而不是copy。

下面介绍几个atomic的一般成员函数。

is_lock_free

1
2
bool is_lock_free() const volatile noexcept;
bool is_lock_free() const noexcept;

判断该 std::atomic 对象是否具备 lock-free 的特性。如果某个对象满足 lock-free 特性,在多个线程访问该对象时不会导致线程阻塞。
store

1
2
void store (T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
void store (T val, memory_order sync = memory_order_seq_cst) noexcept;

修改被封装的值,std::atomic::store 函数将类型为T的参数val复制给原子对象所封装的值。另外参数sync指定内存序(Memory Order),可能的取值如下:

Memory Order 值 Memory Order 类型
memory_order_relaxed Relaxed
memory_order_release Release
memory_order_seq_cst Sequentially consistent

对于内存序稍后介绍。

load

1
2
T load (memory_order sync = memory_order_seq_cst) const volatile noexcept;
T load (memory_order sync = memory_order_seq_cst) const noexcept;

读取所封装的值,依然指定了内存序。

Memory Order 值 Memory Order 类型
memory_order_relaxed Relaxed
memory_order_consume Consume
memory_order_acquire Acquire
memory_order_seq_cst Sequentially consistent

operator T

1
2
operator T() const volatile noexcept;
operator T() const noexcept;

与load类似,读取封装的值。

exchange

1
2
T exchange (T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
T exchange (T val, memory_order sync = memory_order_seq_cst) noexcept;

读取并修改封装的值。将val赋值给封装的值,返回原值。内存序可能取值如下:

Memory Order 值 Memory Order 类型
memory_order_relaxed Relaxed
memory_order_consume Consume
memory_order_acquire Acquire
memory_order_release Release
memory_order_acq_rel Acquire/Release
memory_order_seq_cst Sequentially consistent

compare_exchange_weak

1
2
3
4
5
6
7
8
9
10
//(1)
bool compare_exchange_weak (T& expected, T val,
memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_weak (T& expected, T val,
memory_order sync = memory_order_seq_cst) noexcept;
//(2)
bool compare_exchange_weak (T& expected, T val,
memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_weak (T& expected, T val,
memory_order success, memory_order failure) noexcept;

比较被封装的值与expected值是否相等,相等则返回true,不相等返回false,并且将expected的值改成被封装的旧值,而被封装的值变成expected的值。两种不同的类型主要是内存序的选择不同。第二种情况下,内存序的选择取决于比较结果,如果比较结果为true,使用success,如果比较结构为false,使用failure作为内存序。

与compare_exchange_strong不同, weak版本的compare-and-exchange操作允许(spuriously地)返回false(即原子对象所封装的值与参数expected的物理内容相同,但却仍然返回false),不过在某些需要循环操作的算法下这是可以接受的,并且在一些平台下compare_exchange_weak的性能更好 。如果compare_exchange_weak 的判断确实发生了伪失败(spurious failures)——即使原子对象所封装的值与参数expected的物理内容相同,但判断操作的结果却为false,compare_exchange_weak函数返回false,并且参数expected的值不会改变。

compare_exchange_strong

1
2
3
4
5
6
7
8
9
10
//(1)
bool compare_exchange_strong (T& expected, T val,
memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_strong (T& expected, T val,
memory_order sync = memory_order_seq_cst) noexcept;
//(2)
bool compare_exchange_strong (T& expected, T val,
memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_strong (T& expected, T val,
memory_order success, memory_order failure) noexcept;

对于某些不需要采用循环操作的算法而言, 通常采用compare_exchange_strong更好。初次之外二者基本一致。

除了上面共有的成员函数,对于指针类型和整型会有自己的特化函数。简单介绍一下:

1
2
3
4
5
6
7
8
9
atomic::fetch_add //加法。对于整型参数类型为T,对于指针参数类型则不能再是指针了,而是指针移动的距离
atomic::fetch_sub //减法,同上
atomic::fetch_and //指针没有,按位与
atomic::fetch_or //指针没有,按位或
atomic::fetch_xor //指针没有,按位异或
atomic::operator++ //自增一,有两种:integral operator++(int);integral operator++();有参和无参,分别对应i++,++i
atomic::operator-- //自减一,同上
atomic::operator(comp. assign.) //包含了+=,-=,&=,|=,^=,分布对应了加,减,与,或,异或,除了没有选择内存序的作用,其余与上面各个对应的函数一致。
//指针没有与,或,异或

最后简单介绍一下memory_order,它是几个枚举量,用作执行原子操作的函数的参数,以指定如何同步不同线程上的其他操作。不同线程的执行顺序可以根据这几个枚举量来调整。对于执行顺序的要求,可能会降低并行的运行效率,但是在某些情况下却是必要的。

1
2
3
4
5
6
7
8
typedef enum memory_order {
memory_order_relaxed, // 不对执行顺序做保证
memory_order_acquire, // 本线程中,所有后续的读操作必须在本条原子操作完成后执行
memory_order_release, // 本线程中,所有之前的写操作完成后才能执行本条原子操作
memory_order_acq_rel, // 同时包含 memory_order_acquire 和 memory_order_release
memory_order_consume, // 本线程中,所有后续的有关本原子类型的操作,必须在本条原子操作完成之后执行
memory_order_seq_cst // 全部存取都按顺序执行
} memory_order;