学习线程笔记记录。

原子操作

原子类型

1
2
3
std::atomic_flag // 一个简单的布尔标志,并且在这种类型上的操作都需要是无锁的,不提供is_lock_free()成员函数
std::atomic_flag = ATOMIC_FLAG_INIT // 清除标志
std::atomic<> 类型模板

原子操作

1
2
3
std::atomic<> 类模板不仅仅一套特化的类型,其作为一个原发模板也可以使用用户定义类型创建对应的原子变
量。因为,它是一个通用类模板,很多成员函数的操作在这种情况下有所限制:load(),store()(赋值和转换为用
户类型), exchange(), compare_exchange_weak()和compare_exchange_strong()。
操作 内存顺序设置
Store(写) memory_order_relaxed
memory_order_release
memory_order_seq_cst
Load(读) memory_order_relaxed
memory_order_consume
memory_order_acquire
memory_order_seq_cst
Read-modify-write(读-改-写) memory_order_relaxed
memory_order_consume
memory_order_acquire
memory_order_release
memory_order_acq_rel
memory_order_seq_cst

所有操作的默认顺序都是memory_order_seq_cst

原子实现

atomic_flag

1
std::atomic_flag 非常适合于作自旋互斥锁。初始化标志是“清除”,并且互斥量处于解锁状态。为了锁上互斥量,循环运行test_and_set()直到旧值为false,就意味着这个线程已经被设置为true了。解锁互斥量是一件很简单的事情,将标志清除即可。
atomic_spinlock_mutex.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <future>
#include <functional>
#include <atomic>

using namespace std;

class SpinlockMutex {
public:
SpinlockMutex() :
flag_(ATOMIC_FLAG_INIT) {

}
virtual ~SpinlockMutex() = default;

void Lock() {
while(flag_.test_and_set(std::memory_order_acquire));
}

void Unlock() {
flag_.clear(std::memory_order_release);
}

private:
std::atomic_flag flag_;
};
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <exception>
#include <atomic>

using namespace std;

int main(int argc, char const *argv[])
{
// Init a atomic val
std::atomic_flag f = ATOMIC_FLAG_INIT;
// Free atomic
f.clear(std::memory_order_release);
// Get the old atomic
bool x = f.test_and_set();

// Use atomic<> mode
std::atomic<bool> b;
// Read operation
bool x = b.load(std::memory_order_acquire);
b.store(true);
// Write operation
x = b.exchange(false, std::memory_order_acq_rel);

// Can't understand
bool expected=false;
while(!b.compare_exchange_weak(expected,true) && !expected);

return 0;
}

内存顺序

memory_order_seq_cst

排序一致队列:
默认序列命名为“排序一致”(sequentially cons),是因为它意味着,程序中的行为从任意角度去看,序列顺序
都保持一致。如果原子类型实例上的所有操作都是序列一致的,那么一个多线程程序的行为,就以某种特殊的排
序执行,好像单线程那样。这是目前来看,最容易理解的内存序列,这也就是将其设置为默认的原因:所有线程
都必须了解,不同的操作也遵守相同的顺序。因为其简单的行为,可以使用原子变量进行编写。通过不同的线
程,你可以写出所有序列上可能的操作,这样就可以消除那些不一致,以及验证你代码的行为是否与预期相
符。这也就意味着,所有操作都不能重排序;如果你的代码,在一个线程中,将一个操作放在另一个操作前
面,那么这个顺序就必须让其他所有的线程所了解。

memory_order_seq_cst.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <iostream>
#include <future>
#include <functional>
#include <atomic>
#include <thread>
#include <assert.h>


using namespace std;

class OrderSeqCst {
public:
OrderSeqCst() = default;
virtual ~OrderSeqCst() = default;

void WriteX() {
// The order run.
x.store(true, std::memory_order_seq_cst);
}

void WriteY() {
// The order run.
y.store(true, std::memory_order_seq_cst);
}

void ReadXThenY() {
// The order run.
while (!x.load(std::memory_order_seq_cst));
if (y.load(std::memory_order_seq_cst)) {
++z;
}
std::cout << "Read X Then Y Function: "<< z.load() << std::endl;
}

void ReadYThenX() {
// The order run.
while (!y.load(std::memory_order_seq_cst));
if (y.load(std::memory_order_seq_cst)) {
++z;
}
std::cout << "Read Y Then X Function: "<< z.load() << std::endl;
}

void Run() {
// Init atomic val be false.
x = false;
y = false;

z = 0;

std::thread a(&OrderSeqCst::WriteX, this);
std::thread b(&OrderSeqCst::WriteY, this);
std::thread c(&OrderSeqCst::ReadXThenY, this);
std::thread d(&OrderSeqCst::ReadYThenX, this);

a.join();
b.join();
c.join();
d.join();

assert(z.load() != 0);
}

private:
std::atomic<bool> x,y;
std::atomic<int> z;

};

memory_order_relaxed

自由序列:
在原子类型上的操作以自由序列执行,没有任何同步关系。在同一线程中对于同一变量的操作还是服从先发执行
的关系,但是这里不同线程几乎不需要相对的顺序。唯一的要求是,在访问同一线程中的单个原子变量不能重排
序;当一个给定线程已经看到一个原子变量的特定值,线程随后的读操作就不会去检索变量较早的那个值。当使
用memory_order_relaxed,就不需要任何额外的同步,对于每个变量的修改顺序只是线程间共享的事情。

memory_order_relaxed.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <future>
#include <functional>
#include <atomic>
#include <thread>
#include <assert.h>

using namespace std;

class OrderRelaxed {
public:
OrderRelaxed() = default;
virtual ~OrderRelaxed() = default;

void WriteXThenY() {
// The order run.
x.store(true, std::memory_order_relaxed);
y.store(true, std::memory_order_relaxed);
}

void ReadYThenX() {
// The order run.
while (!y.load(std::memory_order_seq_cst));
if (x.load(std::memory_order_seq_cst)) {
++z;
}
std::cout << "Read Y Then X Function: "<< z.load() << std::endl;
}

void Run() {
// Init atomic val be false.
x = false;
y = false;

z = 0;

std::thread a(&OrderRelaxed::WriteXThenY, this);
std::thread b(&OrderRelaxed::ReadYThenX, this);

a.join();
b.join();

assert(z.load() != 0);
}

private:
std::atomic<bool> x,y;
std::atomic<int> z;
};

memory_order_acq_rel

获取-释放序列:
操作依旧没有统一的顺序,但是在这个序列引入了同
步。在这种序列模型中,原子加载就是“获取”(acquire)操作(memory_order_acquire),原子存储就是“释
放”操作(memory_order_release),原子读-改-写操作(例如fetch_add()或exchange())在这里,不是“获
取”,就是“释放”,或者两者兼有的操作(memory_order_acq_rel)。这里,同步在线程释放和获取间,是成对
的(pairwise)。释放操作与获取操作同步,这样就能读取已写入的值。这意味着不同线程看到的序列虽还是不
同,但这些序列都是受限的。

memory_order_acq_rel.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <future>
#include <functional>
#include <atomic>
#include <thread>
#include <assert.h>

using namespace std;

class OrderAcqRel {
public:
OrderAcqRel() = default;
virtual ~OrderAcqRel() = default;

void WriteXThenY() {
// The order run.
x.store(true, std::memory_order_release);
y.store(true, std::memory_order_release);
}

void ReadYThenX() {
// The order run.
while (!y.load(std::memory_order_acquire));
if (x.load(std::memory_order_relaxed)) {
++z;
}
std::cout << "Read Y Then X Function: "<< z.load() << std::endl;
}

void Run() {
// Init atomic val be false.
x = false;
y = false;

z = 0;

std::thread a(&OrderAcqRel::WriteXThenY, this);
std::thread b(&OrderAcqRel::ReadYThenX, this);

a.join();
b.join();

assert(z.load() != 0);
}

private:
std::atomic<bool> x,y;
std::atomic<int> z;
};
memory_order_acq_rel_sync.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include <iostream>
#include <atomic>
#include <thread>
#include <assert.h>

using namespace std;


class SyncAcqRel {
public:
SyncAcqRel() :
sync1(false),
sync2(false),
sync(0) {

}
virtual ~SyncAcqRel() = default;

void Work1() {
// First run.
data[0].store(42, std::memory_order_relaxed);
data[1].store(97, std::memory_order_relaxed);
data[2].store(17, std::memory_order_relaxed);
data[3].store(-141, std::memory_order_relaxed);
data[4].store(2003, std::memory_order_relaxed);
// Set sync1 by atomic release mode.
sync1.store(true, std::memory_order_release);
}

void TmpWork1() {
// First run.
tmp_data[0].store(42, std::memory_order_relaxed);
tmp_data[1].store(97, std::memory_order_relaxed);
tmp_data[2].store(17, std::memory_order_relaxed);
tmp_data[3].store(-141, std::memory_order_relaxed);
tmp_data[4].store(2003, std::memory_order_relaxed);
// Set sync by atomic release mode.
sync.store(1, std::memory_order_release);
}

void Work2() {
// Second run.
// Wait the sync1 to run.
while (!sync1.load(std::memory_order_acquire));

// Set sync2 by atomic release mode.
sync2.store(true, std::memory_order_release);
}

void TmpWork2() {
// Second run.
// Wait the sync to run.
int expected = 1;
while (!sync.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) {
expected = 1;
}

}

void Work3() {
// Wait the sync2 to run.
while (!sync2.load(std::memory_order_acquire));

// Check the run order.
assert(data[0].load(std::memory_order_relaxed) == 42);
assert(data[1].load(std::memory_order_relaxed) == 97);
assert(data[2].load(std::memory_order_relaxed) == 17);
assert(data[3].load(std::memory_order_relaxed) == -141);
assert(data[4].load(std::memory_order_relaxed) == 2003);

std::cout << "Run1 Right Run Logic." << std::endl;
}

void TmpWork3() {
while (sync.load(std::memory_order_acquire) < 2)

// Check the run order.
assert(tmp_data[0].load(std::memory_order_relaxed) == 42);
assert(tmp_data[1].load(std::memory_order_relaxed) == 97);
assert(tmp_data[2].load(std::memory_order_relaxed) == 17);
assert(tmp_data[3].load(std::memory_order_relaxed) == -141);
assert(tmp_data[4].load(std::memory_order_relaxed) == 2003);

std::cout << "Run2 Right Run Logic." << std::endl;
}

void Run1() {
std::thread a(&SyncAcqRel::Work1, this);
std::thread b(&SyncAcqRel::Work2, this);
std::thread c(&SyncAcqRel::Work3, this);

a.join();
b.join();
c.join();
}

void Run2() {
std::thread a(&SyncAcqRel::TmpWork1, this);
std::thread b(&SyncAcqRel::TmpWork2, this);
std::thread c(&SyncAcqRel::TmpWork3, this);

a.join();
b.join();
c.join();
}

private:
std::atomic<int> data[5];
std::atomic<int> tmp_data[5];
std::atomic<bool> sync1;
std::atomic<bool> sync2;
std::atomic<int> sync;
};

memory_order_consume

获取-释放序列和memory_order_consume的数据相关性:
为memory_order_consume很特别:它完全依赖于数据,两种新关系用来处理数据依赖:前序依赖(dependency-ordered-before)和携带依赖(carries-a-dependency-to)。就像前列(sequenced-before),携带依赖对于数据依赖的操作,严格应用于一个独立线程和其基本模
型;如果A操作结果要使用操作B的操作数,而后A将携带依赖与B。如果A操作的结果是一个标量,比如int,而后
的关系仍然适用于,当A的结果存储在一个变量中,并且这个变量需要被其他操作使用。这个操作是也是可以传递
的,所以当A携带依赖B,并且B携带依赖C,就额可以得出A携带依赖C的关系。

memory_order_consume.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include <atomic>
#include <thread>
#include <assert.h>

using namespace std;

struct X {
int i;
std::string s;
};

class OrderConsume {
public:
OrderConsume() = default;
virtual ~OrderConsume() = default;
void CreateX() {
X* x = new X;
x->i = 42;
x->s = "hello";
a.store(99, std::memory_order_relaxed);
p.store(x, std::memory_order_release);
}

void UseX() {
X* x;
while (!(x = p.load(std::memory_order_consume))) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 2s.
}
assert(x->i == 42);
assert(x->s == "hello");
assert(a.load(std::memory_order_relaxed) == 99);
}

void Run() {
std::thread create_x_work(&OrderConsume::CreateX, this);
std::thread use_x_work(&OrderConsume::UseX, this);

create_x_work.join();
use_x_work.join();

std::cout << "No Errnor Work." << std::endl;
}

private:
std::atomic<X*> p;
std::atomic<int> a;
};

fetch_sub

释放队列与同步:
当存储操作被标记为memory_order_release,memory_or
der_acq_rel或memory_order_seq_cst,加载被标记为memory_order_consum,memory_order_acquire或memory_or
der_sqy_cst,并且操作链上的每一加载操作都会读取之前操作写入的值,因此链上的操作构成了一个释放序列(r
elease sequence),并且初始化存储同步(对应memory_order_acquire或memory_order_seq_cst)或是前序依赖(对
应memory_order_consume)的最终加载。操作链上的任何原子“读-改-写”操作可以拥有任意个存储序列(甚至是m
emory_order_relaxed)。

memory_order_fetch_sub.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <iostream>
#include <future>
#include <functional>
#include <atomic>
#include <thread>
#include <assert.h>
#include <vector>

using namespace std;

class OrderFetchSub {
public:
OrderFetchSub() = default;
virtual ~OrderFetchSub() = default;
void WaitForMoreItems() {

}
void Process(int i) {

}

void PopulateQueue() {
unsigned const number_of_items = 20;
queue_data.clear();
for (unsigned i = 0; i < number_of_items; ++i) {
queue_data.push_back(i);
}
// Init store.
count.store(number_of_items, std::memory_order_release);
}

void ConsumeQueueItems() {
while(true) {
int item_index;
// 一个“读-改-写”操作
if((item_index = count.fetch_sub(1, std::memory_order_acquire)) <= 0) {
// 等待更多元素
// WaitForMoreItems();
continue;
}
// 安全读取queue_data
// Process(queue_data[item_index - 1]);
}
}

void Run() {
std::thread a(&OrderFetchSub::PopulateQueue, this);
std::thread b(&OrderFetchSub::ConsumeQueueItems, this);
std::thread c(&OrderFetchSub::ConsumeQueueItems, this);

a.join();
b.join();
c.join();

std::cout << "End Run" << std::endl;
}

private:
std::vector<int> queue_data;
std::atomic<int> count;
};

栅栏

栅栏操作会对内存序列进行约束,使其无法对任何数据进行修改,典型的做法是与使用memory_order_relaxed约束序的原子操作一起使用。栅栏属于全局操作,执行栅栏操作可以影响到在线程中的其他原子操作。因为这类操作就像画了一条任何代码都无法跨越的线一样,所以栅栏操作通常也被称为“内存栅栏”(memory barriers)。

原子操作栅栏

atomic_fence.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <atomic>
#include <assert.h>
#include <thread>

using namespace std;

class AtomicFence {
public:
AtomicFence() = default;
virtual ~AtomicFence() = default;

void WriteXThenY() {
x.store(true, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
y.store(true, std::memory_order_relaxed);
}

void ReadYThenX() {
while(!y.load(std::memory_order_relaxed));
std::atomic_thread_fence(std::memory_order_acquire);
if (x.load(std::memory_order_relaxed)) {
z++;
}
}

void Run() {
x = false;
y = false;
z = 0;
std::thread a(&AtomicFence::WriteXThenY, this);
std::thread b(&AtomicFence::ReadYThenX, this);
a.join();
b.join();
assert(z.load()!=0);

std::cout << "No Error Run" << std::endl;
}

private:
std::atomic<bool> x, y;
std::atomic<int> z;
};

非原子操作栅栏

bool_atomic_fence.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <iostream>
#include <atomic>
#include <thread>
#include <assert.h>

using namespace std;

class BoolAtomicFence {
public:
BoolAtomicFence() = default;
virtual ~BoolAtomicFence() = default;

void WriteXThenY() {
x = true; // 1 在栅栏前存储x
std::atomic_thread_fence(std::memory_order_release);
y.store(true, std::memory_order_relaxed); // 2 在栅栏后存储y
}

void ReadYThenX() {
while(!y.load(std::memory_order_relaxed)); // 3 在#2写入前,持续等待
std::atomic_thread_fence(std::memory_order_acquire);
if(x) { // 4 这里读取到的值,是#1中写入
++z;
}
}

void Run() {
x = false;
y = false;
z = 0;
std::thread a(&BoolAtomicFence::WriteXThenY, this);
std::thread b(&BoolAtomicFence::ReadYThenX, this);
a.join();
b.join();
assert(z.load() != 0); // 5 断言将不会触发

std::cout << "No Error Run" << std::endl;
}

private:
bool x;
std::atomic<bool> y;
std::atomic<int> z;

};

并发数据设计建议

确保访问是安全的

确保无线程能够看到,数据结构的“不变量”破坏时的状态。
小心那些会引起条件竞争的接口,提供完整操作的函数,而非操作步骤。
注意数据结构的行为是否会产生异常,从而确保“不变量”的状态稳定。
将死锁的概率降到最低。使用数据结构时,需要限制锁的范围,且避免嵌套锁的存在。

能真正的并发访问

锁的范围中的操作,是否允许在所外执行?
数据结构中不同的区域是否能被不同的互斥量所保护?
所有操作都需要同级互斥量保护吗?
能否对数据结构进行简单的修改,以增加并发访问的概率,且不影响操作语义?