学习线程笔记记录。

  线程可以降低业务的耦合性和实现异步的操作,但线程也会带来而外的开销。初步学习线程,记录学习过程。

C++实现线程

  C++实现线程是使用的C++11提供的std::thread线程库,线程的实现有两种需求,类中成员调用和类外调用,列子中展示了如何在类中和类外调用。

  • 类外线程
execute.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#ifndef EXECUTE_H
#define EXECUTE_H

#include <unistd.h>

#include <iostream>

using namespace std;

class Execute
{
public:
Execute() {}
virtual ~Execute() {}

void ExecuteRun()
{
while (1) {
std::cout << "ExecuteRun Start Sleep." << std::endl;
sleep(7);
}
}
};
#endif // EXECUTE_H
  • 成员中调用类
service.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <unistd.h>

#include <iostream>
#include <map>
#include <thread>

#include "execute.hpp"

using namespace std;

class Execute;

class Service
{
public:
Service() {}
virtual ~Service() {}

void Start()
{
// Create the thread by the class-self.
std::thread t1(&Service::ServiceRun, this);

// Create the thread by the out of class.
std::thread t2(&Execute::ExecuteRun, Execute());

tm_["ServiceRun"] = t1.native_handle();
tm_["ExecuteRun"] = t2.native_handle();

// Detaches the thread represented by the object from the calling thread,
// allowing them to execute independently from each other.
// Both threads continue without blocking nor synchronizing in any way. Note
// that when either one ends execution, its resources are released.
// After a call to this function, the thread object becomes non-joinable and
// can be destroyed safely.
t1.detach();
t2.detach();
}

void Stop()
{
pthread_cancel(tm_["ServiceRun"]);
pthread_cancel(tm_["ExecuteRun"]);

// Removes all thread elements from the map.
tm_.clear();
}

void ServiceRun()
{
while (1) {
std::cout << "ServiceRun Start Sleep." << std::endl;
sleep(5);
}
}

private:
typedef std::map<std::string, pthread_t> ThreadMap;
ThreadMap tm_;
};

int main()
{
Service service;

service.Start();

while (1) {
sleep(2);
}

return 0;
}

线程管理

  • 线程启动/等待/分离
1
2
3
4
5
6
7
8
9
10
11
12
13
14

class X
{
public:
void do_lengthy_work(int);
};

X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);

t.join();
t.detach();

  • 批量产生线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

void WorkList() {
std::vector<thread> threads;
std::vector<pthread_t> free_pthread_vector;
for (unsigned int i = 0; i < 3; ++i) {
// 创建线程的数组
threads.push_back(std::thread(&Work::DoWork, this, to_string(i)));
}
for (auto iter = threads.begin(); iter != threads.end(); ++iter) {
// 获取线程的句柄
free_pthread_vector.push(iter->native_handle());
}
// 使用mem_fn启用线程方法
std::for_each(threads.begin(), threads.end(), mem_fn(&std::thread::detach));
}

  • 线程方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

thread t1;

t1.join(); // 线程等待
t1.detach(); // 线程分离

t1.joinable(); // 线程是否调用过等待

std::ref // 线程传递参数引用
std::bind // 线程绑定
std::move // 线程权限转移
std::thread::hardware_concurrency() // 返回能同时并发在一个程序中的线程数量
std::this_thread::get_id() // 返回当前线程的id
std::hash<std::thread::id> // 存储线程的id

线程间数据共享

条件竞争

  • 互斥量(RAII形式互斥量)

  • C++锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Mutex 系列类(四种)
std::mutex,最基本的 Mutex 类。
std::recursive_mutex,递归 Mutex 类。
std::time_mutex,定时 Mutex 类。
std::recursive_timed_mutex,定时递归 Mutex 类。

Lock 类(两种)
std::lock_guard,与 Mutex RAII 相关,方便线程对互斥量上锁。
std::unique_lock,与 Mutex RAII 相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。

其他类型
std::once_flag
std::adopt_lock_t
std::defer_lock_t
std::try_to_lock_t

函数
std::try_lock,尝试同时对多个互斥量上锁。
std::lock,可以同时对多个互斥量上锁。
std::call_once,如果多个线程需要同时调用某个函数,call_once 可以保证多个线程对该函数只调用一次。
  • 锁的说明
1
2
3
4
5
6
7
8
9
10
11

std::lock // 锁多个std::mutex对象
std::lock_guard // 只提供加锁和解锁操作(作用域结束释放)
std::unique_lock // 提供加锁和解锁、尝试锁...操作(作用域结束释放)

// 锁的参数
std::try_to_lock // 允许在不阻塞的情况下尝试锁定
std::defer_lock // 许在不获取锁的情况下创建锁结构。当锁定多个互斥锁时,如果两个函数调用者同时尝试获取锁,则会有一个死锁机会窗口
std::adopt_lock // 调用线程当前拥有锁,则不会尝试第二次锁定
std::recursive_mutex

  • std::lock_guard
lock.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value)
{
{
// lock
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
// lock_guard out of the range will unlock
}
std::cout << "Unlock" << std::endl;
}

bool list_contains(int value_to_find)
{
{
// lock
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
// lock_guard out of the range will unlock
}
std::cout << "Unlock" << std::endl;
}
lock.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd): some_detail(sd) {}
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
std::lock(lhs.m, rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail);
}
};
  • std::unique_lock
lock.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <list>
#include <mutex>
#include <algorithm>
#include <iostream>

using namespace std;

std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value)
{
{
// lock
std::unique_lock<std::mutex> guard(some_mutex);

/* unique_lock have more lock operation */

// guard.lock();
// guard.unlock();
// guard.try_lock();

some_list.push_back(new_value);
// lock_guard out of the range will unlock
}
std::cout << "Unlock" << std::endl;
}

bool list_contains(int value_to_find)
{
{
// lock
std::unique_lock<std::mutex> guard(some_mutex);

/* unique_lock have more lock operation */

// guard.lock();
// guard.unlock();
// guard.try_lock();

return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
// lock_guard out of the range will unlock
}
std::cout << "Unlock" << std::endl;
}
lock.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd): some_detail(sd) {}
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock); // 1 std::def_lock 留下未上锁的互斥量
std::lock(lock_a, lock_b); // 2 互斥量在这里上锁
swap(lhs.some_detail, rhs.some_detail);
}
};

避免死锁

1.避免嵌套锁
2.避免在持有锁时调用用户提供的代码
3.使用固定顺序获取锁
4.使用锁的层次结构

  • 层级锁

a.将应用程序分层,当代码试图锁定一个互斥元时,如果它在较低层已经持有锁定.那么就不允许它锁定该互斥元.
b.每一个mutex都分配一个层级号码,并且严格按照下面两个规则:
当占有层级为N的mutex的时候,只能去获取层次<N的mutex
当试图同时占有多个同层级的mutex的时候,这些锁必须一次性获取,通过类似于std::lock的方法去保证顺序。

层级锁

hierarchical_mutex.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class hierarchical_mutex
{
public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value),
previous_hierarchy_value(0)
{}
void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
void unlock()
{
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if (!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}

private:
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;
static thread_local unsigned long this_thread_hierarchy_value;
void check_for_hierarchy_violation()
{
if (this_thread_hierarchy_value <= hierarchy_value) {
throw std::logic_error(“mutex hierarchy violated”);
}
}
void update_hierarchy_value()
{
previous_hierarchy_value = this_thread_hierarchy_value;
this_thread_hierarchy_value = hierarchy_value;
}
};

#define ULONG_MAX 100
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
hierarchical_mutex high_level_mutex(10000); 
hierarchical_mutex low_level_mutex(5000);
hierarchical_mutex other_mutex(100);

int do_low_level_stuff();

int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}

void high_level_stuff(int some_param);

void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
high_level_stuff(low_level_func());
}

void do_other_stuff();

void other_stuff()
{
high_level_func();
do_other_stuff();
}

void thread_a()
{
high_level_func();
}

void thread_b()
{
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff();
}

层级锁

  • 互斥量所有权的传递
1
2
3
4
5
6
7
8
9
10
11
12
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}
  • 锁的粒度
1
2
3
4
5
6
7
8
9
void get_and_process_data()
{
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process = get_next_data_chunk();
my_lock.unlock(); // 1 不要让锁住的互斥量越过process()函数的调用
result_type result = process(data_to_process);
my_lock.lock(); // 2 为了写入数据,对互斥量再次上锁
write_result(data_to_process, result);
}
  • 延迟安全锁
1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag; // 1
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
std::call_once(resource_flag, init_resource); // 可以完整的进行一次初始化
resource_ptr->do_something();
}
  • 为类成员的延迟初始化(线程安全)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection = connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_):
connection_details(connection_details_)
{}

// connection is a source
void send_data(data_packet const& data)
{
std::call_once(connection_init_flag, &X::open_connection, this);
connection.send_data(data);
}

// connection is a source
data_packet receive_data()
{
std::call_once(connection_init_flag, &X::open_connection, this);
return connection.receive_data();
}
};

  • 读者—>作者锁

一个“作者”线程独占访问和共享访问,让多个“读者”线程并发访问。

可以使用 std::lock_guardboost::shared_mutex 和 std::unique_lockboost::shared_mutex 进行上锁,
作为 std::mutex 的替代方案。与 std::mutex 所做的一样,这就能保证更新线程的独占访问。
其他线程不需要去修改数据结构,其实现可以使用 boost::shared_lockboost::shared_mutex 获取共享访问权。这与使用 std::unique_lock >一样,除非多线要在同时得到同一个 boost::shared_mutex 上有共享锁。唯一的限制就是,当任一线程拥有一个共享锁时,这个线
程就会尝试获取一个独占锁,直到其他线程放弃他们的锁;同样的,当任一线程拥有一个独占锁是,其他线程就
无法获得共享锁或独占锁,直到第一个线程放弃其拥有的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>
class dns_entry;
class dns_cache
{
std::map<std::string, dns_entry> entries;
mutable boost::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain) const
{
boost::shared_lock<boost::shared_mutex> lk(entry_mutex);
std::map<std::string, dns_entry>::const_iterator const it =\
entries.find(domain);
return (it == entries.end()) ? dns_entry() : it->second;
}
void update_or_add_entry(std::string const& domain,
dns_entry const& dns_details)
{
std::lock_guard<boost::shared_mutex> lk(entry_mutex);
entries[domain] = dns_details;
}
};

find_entry()使用了 boost::shared_lock<> 实例来保护其共享和只读权限①;这就使得,多线程
可以同时调用find_entry(),且不会出错。另一方面,update_or_add_entry()使用 std::lock_guard<> 实例,当
表格需要更新时②,为其提供独占访问权限;在update_or_add_entry()函数调用时,独占锁会阻止其他线程对数
据结构进行修改,并且这些线程在这时,也不能调用find_entry()。
  • 嵌套锁(std::recursive_mutex)

同步并发操作

条件变量(condition variables)

  • 条件变量的说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

std::condition_variable // 仅限于与std::mutex一起工作
std::condition_variable_any // 和任何满足最低标准的互斥量一起工作
std::condition_variable::wait // block function to wait
std::condition_variable::wait_for // wait_for指定一个时间段,在当前线程收到通知或者指定的时间rel_time超时之前,该线程都会处于阻塞状态
std::condition_variable::wait_until // 指定一个时间点,在当前线程收到通知或者指定的时间点 abs_time 超时之前,该线程都会处于阻塞状态
std::condition_variable::notify_one // 唤醒某个等待(wait)线程。如果当前没有等待线程,则该函数什么也不做,如果同时存在多个等待线程,则唤醒某个线程是不确定的(unspecified)
std::condition_variable::notify_all // 介绍唤醒所有的等待(wait)线程。如果当前没有等待线程,则该函数什么也不做

std::condition_variable_any // 与 std::condition_variable 类似,只不过 std::condition_variable_any 的 wait 函数可以接受任何 lockable 参数,而 std::condition_variable 只能接受 std::unique_lock<std::mutex> 类型的参数,除此以外,和 std::condition_variable 几乎完全一样。

std::cv_status 枚举类型介绍 // cv_status::no_timeout wait_for 或者 wait_until 没有超时,即在规定的时间段内线程收到了通知。
cv_status::timeout wait_for 或者 wait_until 超时

std::notify_all_at_thread_exit // 当调用该函数的线程退出时,所有在 cond 条件变量上等待的线程都会收到通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>

using namespace std;

std::queue<string> data_cache;

class Threads {
public:
Threads() = default;
virtual ~Threads() = default;

void ProducerThread() {
// Create the producer thread and detach it.
std::thread producer_thread(&Threads::ProducerTask, this);
producer_thread.detach();
}

void CustomerThread() {
// Create the customer thread and detach it.
std::thread customer_thread(&Threads::CustomerTask, this);
customer_thread.detach();
}

void ProducerTask() {
while (true)
{
HandleProducerTask();

}
}

void HandleProducerTask() {
std::string data = "XY";
std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 2s.
// Producer data.
std::lock_guard<std::mutex> lk(cv_mutex_);
data_cache.push(data);
cv_.notify_one();
}

void CustomerTask() {
while (true)
{
// Get the lock status.
std::unique_lock<std::mutex> lk(cv_mutex_);
// Wait the condition variable status.
cv_.wait(lk, [] {
return (data_cache.size() % 2 == 0 && data_cache.size() != 0) ? true : false;
});
// If the wait condition variables is false then the wait
// will auto free the lock and block the thread.
// If the wait condition variables is true then the thread
// can have the lock and it will go the flower logic.
HandleCustomerTask();
// The wait will have the lock, so need free it by the handle for to
// other thread use.
lk.unlock();
}
}

void HandleCustomerTask() {
std::cout << "-----------------------" << std::endl;
while (!data_cache.empty()) {
std::cout << data_cache.front() << std::endl;
data_cache.pop();
}
}

private:
std::condition_variable cv_;
mutable std::mutex cv_mutex_;
};
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>

#include "my_condition_variables.hpp"

using namespace std;

int main(int argc, char const *argv[])
{

// Create the producer and customer thread.
Threads my_threads;

my_threads.ProducerThread();
my_threads.CustomerThread();

while (true) {
std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s.
}

return 0;
}

期望(futures)

  • 期望的说明
1
2
3
4
5
6
7
8
9
10
11
12
std::future          // 实例只能与一个指定事件相关联,在很多线程在等待的时候,只有一个线程能获取等待结果
std::future::get // Run the task and return the async return val
std::future::wait // Run the task and return void
std::shared_future // 实例就能关联多个事件,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据,多个线程需要等待相同的事件的结果
std::async // 启动一个异步任务,std::async会返回一个std::future对象,调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回执行结果
std::launch // 函数调用被延迟到wait()或get()函数调用时才执行
std::launch::defered // 函数调用被延迟到wait()或get()函数调用时才执行
std::launch::async // 函数在其所在的独立线程上执行
std::launch::deferred | std::launch::async // 实现可以选择这两种方式的一种,当函数调用被延迟,它可能不会在运行了
std::packaged_task<> // 对一个函数或可调用对象,绑定一个期望。当 std::packaged_task<>对象被调用,它就
会调用相关函数或可调用对象,将期望状态置为就绪,返回值也会被存储为相关数据

  • Future的作用

thread库提供了future用来访问异步操作的结果。std::promise用来包装一个值将数据和future绑定起来,为获取线程函数中的某个值提供便利,取值是间接通过promise内部提供的future来获取的,也就是说promise的层次比future高。

  • Async Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <future>

using namespace std;

struct AsyncTask
{
void Add(int a, int b, int& ret) {
std::cout << "This Add async thread id: " << std::this_thread::get_id() << std::endl;
ret = a + b;
}
void Del(int a, int b, int& ret) {
std::cout << "This Del async thread id: " << std::this_thread::get_id() << std::endl;
ret = b - a;
}
void operator() (int a, int b, int& c) {
std::cout << "This operator() async thread id: " << std::this_thread::get_id() << std::endl;
c = a * b;
}
};
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <iostream>

#include "my_future.hpp"

int main(int argc, char const *argv[])
{
// Normal async task.
AsyncTask async_task;
int a = 3;
int b = 4;
int c = 0;

// The async is run a async_task point p->add() in the current thread.
auto f1 = std::async(&AsyncTask::Add, &async_task, a, b, std::ref(c));

// The async is run a async_task copy new object new_async_task.del() in the current thread.
auto f2 = std::async(&AsyncTask::Del, async_task, a, b, std::ref(c));

// The async is run a async_task copy new object new_async_task() in the current thread.
auto f3 = std::async(AsyncTask(), a, b, std::ref(c));

// The async is run the async_task async_task object async_task() in the current thread.
auto f4 = std::async(std::ref(async_task), a, b, std::ref(c));

// The async is run a async_task copy new object new_async_task() in create a new thread.
auto f5 = std::async(std::launch::async, AsyncTask(), a, b, std::ref(c));

// f5.get();
f5.wait();
std::cout << c << std::endl;

// The async is run the async_task async_task object async_task() in the current thread.
auto f6 = std::async(std::launch::deferred, AsyncTask(), a, b, std::ref(c));

// The async is run the async_task async_task object async_task() will have two choice.
auto f7 = std::async(std::launch::async | std::launch::deferred, AsyncTask(), a, b, std::ref(c));

// f1.get();
f1.wait();
std::cout << c << std::endl;

// f2.get();
f2.wait();
std::cout << c << std::endl;

// f3.get();
f3.wait();
std::cout << c << std::endl;

// f4.get();
f4.wait();
std::cout << c << std::endl;

// f6.get();
f6.wait();
std::cout << c << std::endl;

// f7.get();
f7.wait();
std::cout << c << std::endl;

return 0;
}
  • Package Task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <future>
#include <functional>

using namespace std;

class Task {
public:
Task() = default;
virtual ~Task() = default;

int Add(int a, int b) {
return a + b;
}

int Del(int a, int b) {
return a - b;
}

int Mul(int a, int b) {
return a * b;
}

};
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>

#include "package_task_future.hpp"

using namespace std::placeholders;

int main(int argc, char const *argv[])
{
Task my_task;
std::packaged_task<int(int, int)> task_1(bind(&Task::Add, &my_task, _1, _2));

// Init the task function to link the package task.
task_1(2, 3);

// Init the task function to the future.
std::future<int> link_task_1 = task_1.get_future();

std::cout << link_task_1.get() << std::endl;


while(true) {
std::this_thread::sleep_for(std::chrono::seconds(2));
}

}
  • Promise
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>
#include <thread>
#include <exception>

#include "promises_future.hpp"

using namespace std::placeholders;

int main(int argc, char const *argv[])
{
std::promise<int> my_pri;
std::thread work1([](std::promise<int>& p){
// Sleep 10s.
std::this_thread::sleep_for(std::chrono::seconds(2));
try {
p.set_value(32);
} catch(...) {
// Throw the system error
p.set_exception(std::current_exception());
// Throw the custom error
p.set_exception(std::make_exception_ptr(std::logic_error("The value is less than zero.")));
}

}, std::ref(my_pri));
// Need run the thread.
work1.detach();

// The share_future can be copy then can pass to other thread and
// the other can to wait the result to count.
std::shared_future<int> sf = my_pri.get_future().share();

std::thread work2([] (std::shared_future<int> f) {
try {
// The share_future can by run more times.
std::cout << "Work2 " << f.get() << std::endl;
} catch(...) {
std::cout << "Work2 Wait error" << std::endl;
}
}, sf);
work2.detach();

std::thread work3([] (std::shared_future<int> f) {
try {
// The share_future can by run more times.
std::cout << "Work3 " << f.get() << std::endl;
} catch(...) {
std::cout << "Work3 Wait error" << std::endl;
}
}, sf);
work3.detach();

//------ future ----------
// By the future only run one times and only one by other thread run get function.
std::future<int> my_future = my_pri.get_future();

// Get the thread return value
std::cout << "Main get" << my_future.get() << std::endl;


while(true) {
std::this_thread::sleep_for(std::chrono::seconds(2));
}

}

时间管理

  • 时延

  • 时延说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::chrono::duration<>  // 函数模板能够对时延进行处理
std::chrono::duration<short, std::ratio<60, 1>> // 存在short类型中时,60秒为1分钟
std::chrono::duration<double, std::ratio<1, 1000>> // 1秒等于1000毫秒

std::future_status::timeout // 函数等待超时
std::future_status::ready // 期望状态改变
std::future_status::deferred // 期望的任务延迟了

std::chrono::milliseconds ms(54802);
// 截断式转换时间
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);

std::chrono::time_point<> // 第一个参数用来指定所要使用的时钟,第二个函数参数用来表示时间的计量单位

std::this_thread::sleep_for // 当线程因为指定时延而进入睡眠时,可使用sleep_for()唤醒
std::this_thread::sleep_until // 因指定时间点睡眠的,可使用sleep_until唤醒

  • 超时函数
类型/命名空间 函数 返回值
std::this_thread[namespace] sleep_for(duration)
sleep_until(time_point)
N/A
std::condition_variable
std::condition_variable_any

wait_for(lock, duration)
wait_until(lock, time_point)
std::cv_status::time_out
std::cv_status::no_timeout
std::condition_variable
std::condition_variable_any
wait_for(lock, duration, predicate)
wait_until(lock, duration, predicate)
bool 当唤醒时,返回谓词的结果
std::timed_mutex
std::recursive_timed_mutex
try_lock_for(duration)
try_lock_until(time_point)
bool 获取锁时返回true,否则返回false
std::unique_lock unique_lock(lockable,duration) N/A对新构建的对象调用owns_lock()
std::unique_lock unique_lock(lockable,time_point) 当获取锁时返回true,否则返回false
std::unique_lock try_lock_for(duration)
try_lock_until(time_point)
bool 当获取锁时返回true,否则返回false
std::future或std::shared_future wait_for(duration) 当等待超时,返回std::future_status::timeout
std::future或std::shared_future wait_until(time_point) 当“期望”准备就绪时,返回std::future_status::ready
当“期望”持有一个为启动的延迟函数,返回std::future_status::deferred
  • Future超时
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <future>
#include <condition_variable>
#include <mutex>
#include <chrono>

using namespace std;

int SeeHello(unsigned int time) {
// When sleep time, it will wait the function SeeHello end.
std::this_thread::sleep_for(std::chrono::seconds(time));
return 1;
}

template<typename T>
void Show(T val) {
std::cout << "Get the val: " << val << std::endl;
}


int main(int argc, char const *argv[])
{
unsigned int time = 3;
std::future<int> f1 = std::async(std::launch::async, SeeHello, time);

// Wait for the block timeout.
if(f1.wait_for(std::chrono::seconds(2)) == std::future_status::ready) {
Show<int>(f1.get());
} else {
std::cout << "Too late" << std::endl;
}

while(true) {
std::this_thread::sleep_for(std::chrono::seconds(2)); // Sleep 2s.
}

return 0;
}
  • Block超时函数
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <iostream>
#include <future>
#include <condition_variable>
#include <mutex>
#include <chrono>

using namespace std;
std::condition_variable cv_;
std::mutex m_;
bool is_time_out_flag_;

void wait_loop()
{
is_time_out_flag_ = false;
// Set the out fo time
unsigned int set_time = 5;
auto const timeout = std::chrono::steady_clock::now() + std::chrono::seconds(set_time);
std::unique_lock<std::mutex> lk(m_);
// Lock and int the loop to wait the time out
// If the flag is true then exit
while(!is_time_out_flag_) {
if (cv_.wait_until(lk, timeout) == std::cv_status::timeout) {
break; // return the false flag.
}
}
return;
}

int main(int argc, char const *argv[])
{
std::thread work1([] {
std::this_thread::sleep_for(std::chrono::seconds(3));
{
std::lock_guard<std::mutex> lk(m_);
is_time_out_flag_ = true;
}
});

work1.detach();

wait_loop();

if(!is_time_out_flag_) {
std::cout << "Is time out." << std::endl;
} else {
std::cout << "Not time out." << std::endl;
}

return 0;
}