《C++新经典》第17章 并发与多线程
admin
2024-02-11 13:30:26

《C++新经典》第17章 并发与多线程

  • 17.1 基本概念和实现
    • 17.1.1 并发、进程、线程的基本概念和综述
    • 17.1.2 并发的实现方法
  • 17.2 线程启动、结束与创建线程写法
    • 17.2.1 线程开始与结束
    • 17.2.2 其它线程创建方法
  • 17.3 线程传参、detach与成员函数作为线程函数
    • 17.3.1 传递临时对象作为线程参数
    • 17.3.2 临时对象作为线程参数续
    • 17.3.3 传递类对象与智能指针作为线程参数
    • 17.3.4 成员函数作为线程入口函数
  • 17.4 多个线程、数据共享与案例代码
    • 17.4.1 创建和等待多个线程
    • 17.4.2 数据共享
    • 17.4.3 案例代码
  • 17.5 互斥量概念、用法、死锁与解决
    • 17.5.1 互斥量概念
    • 17.5.2 互斥量用法
    • 17.5.3 死锁
  • 17.6 unique_lock详解
    • 17.6.1 unique_lock取代lock_guard
    • 17.6.2 unique_lock的第二个参数
    • 17.6.3 unique_lock的成员函数
    • 17.6.4 unique_lock所有权的传递
  • 17.7 单例设计模式共享数据分析、解决与call_once
    • 17.7.1 设计模式简单谈
    • 17.7.2 单例设计模式
    • 17.7.3 单例设计模式共享问题分析、解决
    • 17.7.4 std::call_once
  • 17.8 condition_variable、wait、notify_one与notify_all
    • 17.8.1 条件变量condition_variable、wait与notify_one
    • 17.8.2 wait与notify思考
    • 17.8.3 notify_all
  • 17.9 async、future、packaged_task和promise
    • 17.9.1 std::async和std::future创建后台任务并返回值
    • 17.9.2 std::packaged_task
    • 17.9.3 std::promise
  • 17.10 future其它成员函数、shared_future与atomic
    • 17.10.1 std::future其它成员函数
    • 17.10.2 续谈std::async不确定性问题
    • 17.10.3 std::shared_future
    • 17.10.4 原子操作std::atomic
  • 17.11 Windows临界区与其它各种mutex互斥量
    • 17.11.1 Windows临界区
    • 17.11.2 多次进入临界区试验
    • 17.11.3 自动析构技术
    • 17.11.4 recursive_mutex递归的独占互斥量
    • 17.11.5 带超时的互斥量std::timed_mutx和std::recursive_timed_mutex
  • 17.12 补充知识、线程池浅谈、数量谈与总结
    • 17.12.1 知识点补充
    • 17.12.2 线程池浅谈
    • 17.12.3 线程创建数量谈

17.1 基本概念和实现

17.1.1 并发、进程、线程的基本概念和综述

  1. 并发
    多个任务(独立活动)同时发生(进行)。
    单CPU,操作系统调度,任务切换。
    多CPU,硬件并发(并行)。

  2. 可执行程序
    Windows下exe文件,Linux下有可执行权限的文件(-rwxrw-r–)。

  3. 进程
    一个可执行程序运行起来创建一个进程,进程就是运行起来了的可执行程序。

  4. 线程
    每个进程有唯一主线程,随进程启动。
    线程理解为一条代码的执行通路(道路)。
    每个线程需要独立的堆栈空间(耗费内存,1MB左右),线程切换需保存很多中间状态。上下文切换必须但无价值和意义的额外工作,耗费资源。

17.1.2 并发的实现方法

  1. 多进程并发
    多个可执行程序运行。同一计算机上进程通过管道、文件、消息队列、共享内存等技术通信,不同计算机间进程通过socket(网络套接字)等通信。进程直接数据保护问题,相互通信复杂(即使同一台计算机上)。

  2. 多线程并发
    单个进程创建多个线程(轻量级进程,独立运行),共享进程地址空间(共享内存)、全局变量、指针、引用等。

  3. 总结
    与多进程并发相比较,多线程并发的优缺点:
    优点:线程轻量级,启动速度更快;系统资源开销更少;执行速度更快。
    缺点:使用有难度,小心数据一致性问题。

17.2 线程启动、结束与创建线程写法

17.2.1 线程开始与结束

主线程执行完后,未执行完的子线程会被操作系统强制终止(detach例外)。

#include 
#include 
using namespace std;void myprint() {cout <<"线程开始" <thread mytobj(myprint);//函数myprint是可调用对象mytobj.join();cout <<"main主函数结束" <

线程detach后,线程关联的thread对象失去与线程的关联,线程驻留在后台运行(控制台不会输出),由C++运行时库接管(负责清理该线程相关的资源),类似守护线程。

#include 
#include 
using namespace std;void myprint() {cout <<"线程开始" <thread mytobj(myprint);//函数myprint是可调用对象mytobj.detach(); //主线程结束后,myprint子线程转入后台执行,看不见输出结果(输出结果的窗口关联的是主线程)cout <<"main主函数结束" <
thread mytobj(myprint);
if(mytobj.joinable())cout <<"joinable() == true" <

17.2.2 其它线程创建方法

  1. 类创建线程
class TA {
public:void operator()() {//重载(),可调用对象cout <<"TA::operator()开始" <cout <<"TA(int i), this=" <cout <<"~TA(), this=" <cout <<"TA(const TA& ta), this=" <
  1. lambda表达式创建线程
auto mylamthread = [] {cout <<"线程开始" <

17.3 线程传参、detach与成员函数作为线程函数

17.3.1 传递临时对象作为线程参数

  1. 陷阱1
    detach创建线程时,不要往线程中传递引用、指针类参数。
//const引用会产生临时对象
void myprint(int i, const string& mybuf) {}
  1. 陷阱2
void myprint(int i, const string& mybuf) {}
int mvar = 1;
char mybuf[] = "test";
//mybuf转换为string时机不确定,潜在问题
thread mytobj(myprint, mvar, mybuf);//构造string临时对象,无问题
thread mytobj(myprint, mvar, string(mybuf));

自定义类测试

#include 
#include 
using namespace std;class A
{
public:A(int a) : m_i(a) { cout << "A::A(int a)" << this << endl; }A(const A &a) { cout << "A::A(const A& a)" << this << endl; }~A() { cout << "~A::A()" << this << endl; }private:int m_i;
};void myprint(int i, const A &mybuf)
{cout << &mybuf << endl;
}int main()
{int mvar = 1;int secondvar = 12;// 拷贝构造函数可能未执行,main线程就结束了//thread mytobj(myprint, mvar, secondvar);// main线程,执行一次构造函数+一次拷贝构造函数(生成类A实例位于子线程中)+一次析构函数// 子线程中,输出实例A地址+一次析构函数thread mytobj(myprint, mvar, A(secondvar));//mytobj.join();mytobj.detach();cout << "main over" << endl;return 0;
}
  1. 总结
    int这种简单类型,使用值传递,不要使用引用;
    类对象作为参数时,避免隐式类型转换,直接在创建线程时构建临时对象,线程函数形参使用引用;
    建议使用join,不使用detach,这样无局部变量失效导致线程非法内存引用的问题。

17.3.2 临时对象作为线程参数续

  1. 线程id
    标识线程的唯一数字。
std::this_thread::get_id()
  1. 临时对象构造时机
#include 
#include 
using namespace std;class A
{
public:A(int a) : m_i(a) { cout << "A::A(int a) this: " << this <<" id: "<< this_thread::get_id() << endl; }A(const A &a) { cout << "A::A(const A& a) this: " << this <<" id: "<< this_thread::get_id() << endl; }~A() { cout << "~A::A() this: " << this <<" id: "<< this_thread::get_id() << endl; }private:int m_i;
};void myprint2(const A &mybuf)
{cout <<"this: "<< &mybuf <<" id: "<< this_thread::get_id() << endl;
}//void myprint2(const A mybuf)
//非引用时,
//子线程中会增加一次拷贝构造函数+一次析构函数int main()
{cout << "main id: " << this_thread::get_id() << endl;int mvar = 1;//子线程中利用变量mvar创建A实例//mvar可能因main执行完毕而回收,潜在问题。//thread mytobj(myprint2, mvar);// main线程执行构造函数和拷贝构造函数(创建两个A的实例)// 构造函数创建的实例A执行完拷贝构造后,main线程析构// 拷贝构造函数创建的实例A归子线程所有,且由子线程析构thread mytobj(myprint2, A(mvar));mytobj.join();cout << "main over" << endl;return 0;
}

建议

//函数使用类的常量引用
void myprint2(const A& mybuf);//创建线程时构建临时对象
thread mytobj(myprint2, A(mvar));

17.3.3 传递类对象与智能指针作为线程参数

临时对象不能作为非const引用参数。

#include 
#include 
using namespace std;class A
{
public:A(int a) : m_i(a) { cout << "A::A(int a) this: " << this << " id: " << this_thread::get_id() << endl; }A(const A &a) { cout << "A::A(const A& a) this: " << this << " id: " << this_thread::get_id() << endl; }~A() { cout << "~A::A() this: " << this << " id: " << this_thread::get_id() << endl; }mutable int m_i; // mutable,const&可修改
};//c++只会为const引用产生临时对象
//临时对象不能作为非const引用
void myprint2(const A &a)
{a.m_i = 199;
}int main()
{A myobj(10);//thread t(myprint2, myobj); //子线程调用拷贝构造函数生成A实例thread t(myprint2, std::ref(myobj)); //真引用main线程的A实例t.join();cout << myobj.m_i << endl;cout << "main over" << endl;return 0;
}
#include 
#include 
using namespace std;class A
{
public:A(int a) : m_i(a) { cout << "A::A(int a) this: " << this << " id: " << this_thread::get_id() << endl; }A(A &a) { cout << "A::A(A& a) this: " << this << " id: " << this_thread::get_id() << endl; }~A() { cout << "~A::A() this: " << this << " id: " << this_thread::get_id() << endl; }int m_i; 
};void myprint2(A &a)
{a.m_i = 199;cout << "myprint2(A &a) this: " << &a << " id: " << this_thread::get_id() << endl; 
}int main()
{A myobj(10);thread t(myprint2, std::ref(myobj)); //真传递引用,可修改t.join();cout << myobj.m_i << endl;cout << "main over" << endl;return 0;
}
#include 
#include 
using namespace std;void myprint3(unique_ptr pzn)
{*pzn = 22;
}int main()
{unique_ptr myp(new int(100));cout<< *myp <

17.3.4 成员函数作为线程入口函数

#include 
#include 
using namespace std;class A
{
public:A(int a) : m_i(a) { cout << "A::A(int) this: " << this << " id: " << this_thread::get_id() << endl; }A(A &a) { cout << "A::A(A&) this: " << this << " id: " << this_thread::get_id() << endl; }~A() { cout << "~A::A() this: " << this << " id: " << this_thread::get_id() << endl; }public:void thread_work(int num){cout << "this: " << this << " id: " << this_thread::get_id() << endl;}int m_i;
};class A2
{
public:A2(int a) : m_i(a) { cout << "A2::A2(int) this: " << this << " id: " << this_thread::get_id() << endl; }A2(A2 &a) { cout << "A2::A2(A2&) this: " << this << " id: " << this_thread::get_id() << endl; }~A2() { cout << "~A2::A2() this: " << this << " id: " << this_thread::get_id() << endl; }public:void operator()(int num){cout << "this: " << this << " id: " << this_thread::get_id() << endl;}int m_i;
};int main()
{if(0){A a(3);thread obj(&A::thread_work, a, 15);//拷贝构造函数主线程执行,析构函数子线程执行obj.join();}if(0){A a(3);thread obj(&A::thread_work, &a, 15);//thread obj(&A::thread_work, std::ref(a), 15);//同上,无拷贝构造函数,主线程和子线程共用类aobj.join();} if(0){A2 a(3);thread obj(a, 15);//拷贝构造函数主线程执行,析构函数子线程执行obj.join();}if(1){A2 a(3);//thread obj(&a, 15);//errorthread obj(std::ref(a), 15);//无拷贝构造函数,主线程和子线程共用类aobj.join();} cout << "over!\n";return 0;
}

17.4 多个线程、数据共享与案例代码

17.4.1 创建和等待多个线程

#include 
#include 
#include 
#include 
#include 
using namespace std;void myprint(int num)
{cout << num << endl;
}#define N 5
int main()
{/*vector threads;for (int i = 0; i < N; i++)//threads.push_back(thread(myprint, i));threads.emplace_back(thread(myprint, i));*/vector threads(N);for (int i = 0; i < N; i++)threads[i] = thread(myprint, i);std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));/*for (int i = 0; i < N; i++)threads[i].join();*//*for (auto &entry : threads)entry.join();*//*for (auto iter = threads.begin(); iter != threads.end(); ++iter)iter->join();*/cout << "main over" << endl;return 0;
}

17.4.2 数据共享

只读没问题,读写需加锁。

17.4.3 案例代码

list频繁按序插入和删除数据时效率更高,vector随机插入和删除数据时效率更高。

#include 
#include 
#include 
#include 
#include 
#include 
using namespace std;#define N 1000
class A
{
public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){msgRecvQueue.push_back(i);this_thread::sleep_for(chrono::seconds(3));}}void outMsgRecvQueue(){for (int i = 0; i < N; i++){if (msgRecvQueue.empty()){cout<<"empty"<int command = msgRecvQueue.front();msgRecvQueue.pop_front();cout<<"do something"< msgRecvQueue;
};int main()
{A a;thread outObj(&A::outMsgRecvQueue, &a);thread inObj(&A::inMsgRecvQueue, std::ref(a));inObj.join();outObj.join();cout << "main over" << endl;return 0;
}

17.5 互斥量概念、用法、死锁与解决

17.5.1 互斥量概念

互斥量,mutex,一个类,一把锁。
保护需要保护的数据。保护数据少了,无保护效果;保护数据多了,影响程序运行效率。

17.5.2 互斥量用法

  1. lock和unlock
mutex mtx;
mtx.lock();mtx.unlock();
  1. lock_guard
mutex mtx;std::lock_guard guard(mtx);

17.5.3 死锁

都等待对方释放锁,而都锁住了对方需要的锁,相互等待。

一般解决方法是按相同顺序上锁。

m1.lock();
m2.lock();
m2.unlock();
m1.unlock();
//或者
lock_guard g1(m1);
lock_guard g2(m2);

lock可以一次锁住两个及以上互斥量。(要么都锁住,要么都不锁)

std::lock(m1, m2);//m1,m2先后顺序无关m2.unlock();
m1.unlock();
mutex m1;
mutex m2;std::lock(m1, m2);//m1,m2先后顺序无关,同时锁住m1和m2lock_guard g1(m1, std::adopt_lock);
lock_guard g2(m2, std::adopt_lock);//lock_guard g1(m1);
//g1构造函数中调用lock,析构函数调用unlock
//std::adopt_lock告诉g1构造函数中不调用lock

17.6 unique_lock详解

一般使用lock_guard。
unique_lock比lock_guard更灵活,执行效率差一点,内存占用多一点。

#include 
#include 
#include 
#include 
using namespace std;#define N 5
class A
{
public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){lock_guard guard(mtx);msgRecvQueue.push_back(i);this_thread::sleep_for(chrono::milliseconds(30));}}void outMsgRecvQueue(){int command;for (int i = 0; i < N; i++){bool result = outMsgLULProc(command);if (!result){cout << "empty" << endl;}else{cout << "do something" << endl;}this_thread::sleep_for(chrono::milliseconds(10));}}private:bool outMsgLULProc(int &command){lock_guard guard(mtx);if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();return true;}return false;}private:list msgRecvQueue;mutex mtx;
};int main()
{A a;thread outObj(&A::outMsgRecvQueue, &a);thread inObj(&A::inMsgRecvQueue, std::ref(a));inObj.join();outObj.join();cout << "main over" << endl;return 0;
}

17.6.1 unique_lock取代lock_guard

unique_lock可完全取代lock_guard。

17.6.2 unique_lock的第二个参数

  1. std::adopt_lock
    adopt_lock标记m已经lock过,g构造函数中不再调用lock。
mutex m;
m.lock();
unique_lock g(m, adopt_lock);
  1. std::try_to_lock

#include 
#include 
#include 
#include 
using namespace std;#define N 5
class A
{
public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){unique_lock guard(mtx, try_to_lock);//mtx不能lockif(guard.owns_lock()) //   拿到锁头msgRecvQueue.push_back(i);this_thread::sleep_for(chrono::milliseconds(30));}}void outMsgRecvQueue(){int command;for (int i = 0; i < N; i++){bool result = outMsgLULProc(command);if (!result){cout << "empty" << endl;}else{cout << "do something" << endl;}this_thread::sleep_for(chrono::milliseconds(10));}}private:bool outMsgLULProc(int &command){unique_lock guard(mtx);if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();return true;}return false;}private:list msgRecvQueue;mutex mtx;
};int main()
{A a;thread outObj(&A::outMsgRecvQueue, &a);thread inObj(&A::inMsgRecvQueue, std::ref(a));inObj.join();outObj.join();cout << "main over" << endl;return 0;
}
  1. std::defer_lock
    初始化未加锁的mutex,mutex不能lock。
mutex mtx;
unique_lock guard(mtx, defer_lock);//mtx不能lock

17.6.3 unique_lock的成员函数

  1. lock
mutex mtx;
unique_lock guard(mtx, defer_lock);guard.lock();
  1. unlock
mutex mtx;
unique_lock guard(mtx);guard.unlock();
  1. try_lock
mutex mtx;
unique_lock guard(mtx);guard.try_lock();
  1. release
mutex mtx;
unique_lock guard(mtx);mutex* p_mtx = guard.release();//解除guard与mtx的关联
p_mtx->unlock();//需要自己解锁

锁住代码合适(粒度)。

17.6.4 unique_lock所有权的传递

unique_lock对mutex的所有权可以移动但不能复制。

mutex mtx;
unique_lock g1(mtx);
//unique_lock g2(mtx);//errorunique_lock g2(std::move(g1));
unique rtn_unique_lock(){unique_lock g(mtx);return g;//会生成临时对象,并调用移动构造函数
}

17.7 单例设计模式共享数据分析、解决与call_once

17.7.1 设计模式简单谈

项目开发经验、模块划分经验等总结起来构成的一系列开发技巧。扩展方便,程序写起来灵活(增加删除模块、功能方便)。

17.7.2 单例设计模式

特殊的类,该类对象只能创建一个。


#include 
#include 
#include 
#include 
using namespace std;class MyCAS
{
private:MyCAS(){cout << "MyCAS()\n";}~MyCAS(){cout << "~MyCAS()\n";}private:static MyCAS *m_instance;public:void func(){cout << "test" << endl;}static MyCAS *GetInstance(){if (m_instance == nullptr){static CGarhuishou cg;m_instance = new MyCAS();}return m_instance;}private:class CGarhuishou{public:CGarhuishou(){cout << "CGarhuishou()\n";}~CGarhuishou(){if (MyCAS::m_instance){delete MyCAS::m_instance;MyCAS::m_instance = nullptr;}cout << "~CGarhuishou()\n";}};
};MyCAS *MyCAS::m_instance = nullptr;int main()
{{MyCAS::GetInstance()->func();MyCAS * p_a = MyCAS::GetInstance();p_a->func();}cout << "main over" << endl;return 0;
}

17.7.3 单例设计模式共享问题分析、解决


#include 
#include 
#include 
#include 
using namespace std;class MyCAS
{
private:MyCAS(){cout << "MyCAS()\n";}~MyCAS(){cout << "~MyCAS()\n";}private:static MyCAS *m_instance;static mutex mtx;public:void func(){cout << "test" << endl;}static MyCAS *GetInstance(){if (m_instance == nullptr){unique_lock u(mtx);if (m_instance == nullptr){static CGarhuishou cg;m_instance = new MyCAS();}}return m_instance;}private:class CGarhuishou{public:CGarhuishou(){cout << "CGarhuishou()\n";}~CGarhuishou(){if (MyCAS::m_instance){delete MyCAS::m_instance;MyCAS::m_instance = nullptr;}cout << "~CGarhuishou()\n";}};
};MyCAS *MyCAS::m_instance = nullptr;
mutex MyCAS::mtx;void mythread()
{cout << "begin\n";MyCAS *p_a = MyCAS::GetInstance();p_a->func();cout << "end\n";
}int main()
{{thread t1(mythread);thread t2(mythread);t1.join();t2.join();}cout << "main over" << endl;return 0;
}

17.7.4 std::call_once

保证函数只被执行一次。
once_flag,结构,标记。


#include 
#include 
#include 
using namespace std;class MyCAS
{
private:MyCAS(){cout << "MyCAS()\n";}~MyCAS(){cout << "~MyCAS()\n";}private:static MyCAS *m_instance;static once_flag g_flag;public:void func(){cout << "test" << endl;}static MyCAS *GetInstance(){if (m_instance == nullptr)call_once(g_flag, CreateInstance);return m_instance;}private:static void CreateInstance(){static CGarhuishou cg;m_instance = new MyCAS();}private:class CGarhuishou{public:CGarhuishou(){cout << "CGarhuishou()\n";}~CGarhuishou(){if (MyCAS::m_instance){delete MyCAS::m_instance;MyCAS::m_instance = nullptr;}cout << "~CGarhuishou()\n";}};
};MyCAS *MyCAS::m_instance = nullptr;
once_flag MyCAS::g_flag;void mythread()
{cout << "begin\n";MyCAS *p_a = MyCAS::GetInstance();p_a->func();cout << "end\n";
}int main()
{{thread t1(mythread);thread t2(mythread);t1.join();t2.join();}cout << "main over" << endl;return 0;
}

17.8 condition_variable、wait、notify_one与notify_all

17.8.1 条件变量condition_variable、wait与notify_one

条件变量用于线程中等待一个条件满足(其它线程通知)时,继续往下执行。
condition_variable,条件相关的类,用于等待一个条件达成。

17.8.1-1.cpp

#include 
#include 
#include 
#include 
using namespace std;#define N 5
class A
{
public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){unique_lock g(m);msgRecvQueue.push_back(i);this_thread::sleep_for(chrono::microseconds(20));}}void outMsgRecvQueue(){int command;for (int i = 0; i < N; i++){bool result = outMsgLULProc(command);if (result)cout << "do something" << endl;elsecout << "empty" << endl;this_thread::sleep_for(chrono::microseconds(10));}}private://双重锁定或者双重检查bool outMsgLULProc(int &command){if (!msgRecvQueue.empty()){unique_lock g(m);if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();return true;}}return false;}private:list msgRecvQueue;mutex m;
};int main()
{{A a;thread outObj(&A::outMsgRecvQueue, &a);thread inObj(&A::inMsgRecvQueue, std::ref(a));inObj.join();outObj.join();}cout << "main over" << endl;return 0;
}

17.8.1-2.cpp

#include 
#include 
#include 
#include 
#include 
using namespace std;#define N 5
class A
{
public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){unique_lock g(m);msgRecvQueue.push_back(i);g.unlock();//this_thread::sleep_for(chrono::milliseconds(2));cond.notify_one();}}void outMsgRecvQueue(){for (int i = 0; i < N; i++){unique_lock g(m);// cond.wait(g)等价于cond.wait(g, []{ return false; });// lambda表达式返回true,wait直接返回;// lambda表达式返回false,wait将解锁互斥并堵塞到这行,直到其它线程调用notify。// wait第二次参数是个可调用对象//[this|&]()或者[this|&]cond.wait(g, [&]{ return !msgRecvQueue.empty(); });int command = msgRecvQueue.front();//肯定有数据msgRecvQueue.pop_front();g.unlock(); // unique_lock可以随时解锁,以免锁住太长时间。cout << "do something" << endl;this_thread::sleep_for(chrono::milliseconds(1));}}private:list msgRecvQueue;mutex m;condition_variable cond;
};int main()
{{A a;thread inObj(&A::inMsgRecvQueue, std::ref(a));thread outObj(&A::outMsgRecvQueue, &a);outObj.join();inObj.join();}cout << "main over" << endl;return 0;
}

17.8.2 wait与notify思考

notify不能确保wait一定拿到锁,notify后wait线程不一定唤醒,notify线程可能又抢到锁。

无wait执行时,notify无任何效果。

17.8.3 notify_all

notify_one()只唤醒一个wait线程,notify_all()唤醒所有wait线程。

#include 
#include 
#include 
#include 
#include 
#include 
using namespace std;#define N 5
class A
{
public:void inMsgRecvQueue(int n){for (int i = 0; i < N; i++){{unique_lock g(m);msgRecvQueue.push_back(i + 1);// g.unlock();}this_thread::sleep_for(chrono::milliseconds(2));cond.notify_all();}//生产结束,消息队列放入0unique_lock g(m);for (int i = 0; i < n; i++)msgRecvQueue.push_back(0);g.unlock();cond.notify_all();cout << "inMsgRecvQueue over" << endl;}void outMsgRecvQueue(){while (true){unique_lock g(m);cond.wait(g, [&]{ return !msgRecvQueue.empty(); });int command = msgRecvQueue.front();msgRecvQueue.pop_front();g.unlock();if (command == 0) //消息队列放入0,跳出循环break;cout << "do something" << endl;this_thread::sleep_for(chrono::milliseconds(1));}cout << "outMsgRecvQueue over" << endl;}private:list msgRecvQueue;mutex m;condition_variable cond;
};int main()
{{A a;int n = 5;vector threads(n);for (int i = 0; i < n; i++)threads[i] = thread(&A::outMsgRecvQueue, std::ref(a));thread inObj(&A::inMsgRecvQueue, std::ref(a), n);inObj.join();for (int i = 0; i < n; i++)threads[i].join();}cout << "main over" << endl;return 0;
}

17.9 async、future、packaged_task和promise

17.9.1 std::async和std::future创建后台任务并返回值

  1. std::async和std::future的用法
    std::async是一个函数模板,用来启用一个异步任务(自动创建一个新线程【有时不会】并开始执行对应的线程入口函数),返回一个std::future对象(类模板,含有线程入口函数的返回结果)。
#include 
#include 
#include 
using namespace std;int mythread(){cout<{future result = async(mythread);result.wait();//只等待线程返回,本身不返回结果。cout<
#include 
#include 
#include 
using namespace std;class A
{
public:int mythread(int n){cout << n << endl;cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;return 5;}
};int main()
{{A a;int n = 12;future result = async(&A::mythread, &a, n);result.wait();                //只等待线程返回,本身不返回结果。cout << result.get() << endl; //卡在这里,等待子线程结束,只能调用一次。}cout << "main over" << endl;return 0;
}
  1. std::async额外参数详解

(1)launch::deferred
线程入口函数执行延迟到wait货get函数调用(此时未创建新线程),无调用则线程不执行。

auto result = async(launch::deferred, &A::mythread, &a, n);

(2)launch::async
创建时就立即执行(异步任务在新线程执行)。

auto result = async(launch::deferred, &A::mythread, &a, n);

(3)launch::deferred | launch::async
系统自行决定同步(无新线程)或异步(创建新线程)。

(4)无额外参数,同(3)

  1. std::async和std::thread的区别
    thread不容易拿到线程返回值,一般通过指针;
    async通过future拿到。
    thread创建线程太多,可能失败或崩溃;async一般不会。

  2. std::async不确定性问题的解决
    同步或异步。

17.9.2 std::packaged_task

类模板,模板参数是各种可调用对象,packaged_task将对象包装起来,方便作为线程入口函数使用。

#include 
#include 
#include 
using namespace std;int mythread(int n)
{cout << n << endl;cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;return 5;
}int main()
{{packaged_task mypt(mythread);thread t(ref(mypt), 1);future result = mypt.get_future();cout<
#include 
#include 
#include 
using namespace std;int main()
{{packaged_task mypt([](int n) {       cout << n << endl;cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;return 5; });thread t(ref(mypt), 1);future result = mypt.get_future();cout << result.get() << endl;t.join();}cout << "main over" << endl;return 0;
}

packaged_task包装起来的对象可以直接调用,packaged_task对象也是一个可调用对象。

#include 
#include 
#include 
using namespace std;int main()
{cout << "main id " << this_thread::get_id()  << endl;{packaged_task mypt([](int n) {       cout << n << endl;cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;return 5; });//thread t(ref(mypt), 1);mypt(105);future result = mypt.get_future();cout << result.get() << endl;//t.join();}cout << "main over" << endl;return 0;
}
#include 
#include 
#include 
#include 
using namespace std;int main()
{cout << "main id " << this_thread::get_id()  << endl;{packaged_task mypt([](int n) {       cout << n << endl;cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;return 5; });vector> mytasks;mytasks.push_back(std::move(mypt));packaged_task mypt2;auto iter = mytasks.begin();mypt2=std::move(*iter);mytasks.erase(iter);mypt2(105);future result = mypt2.get_future();cout << result.get() << endl;}cout << "main over" << endl;return 0;
}

17.9.3 std::promise

类模板,某个线程中赋值,其它线程中取值。

#include 
#include 
#include 
#include 
using namespace std;void mythread(promise &tmp, int n)
{cout << n << endl;cout << this_thread::get_id() << " start" << endl;tmp.set_value(n*10);this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;
}int main()
{cout << "main id " << this_thread::get_id() << endl;{promise prog;thread t(mythread, std::ref(prog), 11);future result = prog.get_future();cout << result.get() << endl;t.join();}cout << "main over" << endl;return 0;
}
#include 
#include 
#include 
#include 
using namespace std;void mythread(promise &tmp, int n)
{cout << n << endl;cout << this_thread::get_id() << " start" << endl;tmp.set_value(n * 10);this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;
}void mythread2(future &tmp)
{cout << tmp.get() << endl;//get只能调用一次cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;
}int main()
{cout << "main id " << this_thread::get_id() << endl;{promise prog;thread t1(mythread, std::ref(prog), 11);t1.join();future result = prog.get_future();// cout << result.get() << endl;//get只能调用一次thread t2(mythread2, std::ref(result));t2.join();}cout << "main over" << endl;return 0;
}

17.10 future其它成员函数、shared_future与atomic

17.10.1 std::future其它成员函数

#include 
#include 
#include 
#include 
using namespace std;int mythread()
{cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;return 5;
}int main()
{cout << "main id " << this_thread::get_id() << endl;{future result = async(mythread);// future result = async(launch::deferred, mythread);future_status status = result.wait_for(chrono::microseconds(8));switch (status){case future_status::timeout: //超时未执行完cout << result.get() << endl;break;case future_status::ready: //执行完cout << result.get() << endl;break;case future_status::deferred: //延迟未执行cout << result.get() << endl;break;}}cout << "main over" << endl;return 0;
}

17.10.2 续谈std::async不确定性问题

//异步执行
future result = async(launch::async, mythread);

17.10.3 std::shared_future

future移动语义,get只能调用一次;
shared_future数据复制非转移,get可调用多次。

#include 
#include 
#include 
#include 
using namespace std;int mythread(int n)
{cout << n << endl;cout << this_thread::get_id() << " start" << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;return 5;
}void mythread2(shared_future &tmp)
{cout << this_thread::get_id() << " start" << endl;cout << tmp.get() << endl;this_thread::sleep_for(chrono::microseconds(10));cout << this_thread::get_id() << " end" << endl;
}int main()
{cout << "main id " << this_thread::get_id() << endl;{packaged_task mypt(mythread);thread t1(ref(mypt), 1);t1.join();future result = mypt.get_future();bool ifcanget = result.valid(); //未调用过get,true// auto mythreadresult = result.get();//只能get一次// ifcanget = result.valid();//get后,false// result未调用get,将result内容放在shared_future中,future空了shared_future result_s(move(result));// shared_future result_s(result.share());//同上ifcanget = result.valid();   // false,furure空了ifcanget = result_s.valid(); // true,shared_future有内容//可以多次调用getauto mythreadresult = result_s.get();mythreadresult = result_s.get();thread t2(mythread2, ref(result_s));t2.join();}cout << "main over" << endl;return 0;
}
int main()
{cout << "main id " << this_thread::get_id() << endl;{packaged_task mypt(mythread);thread t1(ref(mypt), 1);t1.join();shared_future result_s(mypt.get_future());// shared_future result_s(result.share());//同上//可以多次调用getauto mythreadresult = result_s.get();mythreadresult = result_s.get();thread t2(mythread2, ref(result_s));t2.join();}cout << "main over" << endl;return 0;
}

17.10.4 原子操作std::atomic

一种不需要用到互斥量加锁(无锁)技术的多线程并发编程方式,在多线程中不会被打断的程序执行片段,效率更高。
互斥量加锁针对一个代码段(几行代码),原子操作针对一个变量。
原子操作指不可分割的操作,操作的状态要么完成,要么没完成。

std::atomic count = 0;
void mythread(){for(int i=0; i<1000; i++){count++;count += 1;//count = count + 1;//非原子操作}
}
++、--、+=、-=、&=、|=、^=等简单运算符是原子操作
std::atomic flag = false;

17.11 Windows临界区与其它各种mutex互斥量

17.11.1 Windows临界区

#include #include 
#include 
#include 
#include 
using namespace std;#define __WINDOWSLJQ__#define N 5
class A
{
public:A(){
#ifdef __WINDOWSLJQ__InitializeCriticalSection(&winsec);
#endif}virtual ~A(){
#ifdef __WINDOWSLJQ__DeleteCriticalSection(&winsec);
#endif}public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){
#ifdef __WINDOWSLJQ__EnterCriticalSection(&winsec);msgRecvQueue.push_back(i);LeaveCriticalSection(&winsec);
#elsemtx.lock();msgRecvQueue.push_back(i);mtx.unlock();
#endif}}void outMsgRecvQueue(){int command;for (int i = 0; i < N; i++){bool result = outMsgLULProc(command);if (!result)cout << "empty" << endl;elsecout << "do something" << endl;}}private:bool outMsgLULProc(int &command){#ifdef __WINDOWSLJQ__EnterCriticalSection(&winsec);if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();LeaveCriticalSection(&winsec);return true;}LeaveCriticalSection(&winsec);
#elsemtx.lock();if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();mtx.unlock();return true;}mtx.unlock();
#endifreturn false;}private:list msgRecvQueue;mutex mtx;
#ifdef __WINDOWSLJQ__CRITICAL_SECTION winsec;
#endif
};int main()
{{A a;thread outObj(&A::outMsgRecvQueue, &a);thread inObj(&A::inMsgRecvQueue, std::ref(a));inObj.join();outObj.join();}cout << "main over" << endl;return 0;
}

17.11.2 多次进入临界区试验

临界区,需要在多线程编程中进行保护的共享数据相关的代码行(区域)。

#include #include 
#include 
#include 
#include 
using namespace std;#define __WINDOWSLJQ__#define N 5
class A
{
public:A(){
#ifdef __WINDOWSLJQ__InitializeCriticalSection(&winsec);
#endif}virtual ~A(){
#ifdef __WINDOWSLJQ__DeleteCriticalSection(&winsec);
#endif}public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){
#ifdef __WINDOWSLJQ__EnterCriticalSection(&winsec); //进入临界区EnterCriticalSection(&winsec); //调用两次msgRecvQueue.push_back(i);LeaveCriticalSection(&winsec); //离开临界区LeaveCriticalSection(&winsec); //也要调用两次
#elsemtx.lock();// mtx.lock();//errormsgRecvQueue.push_back(i);mtx.unlock();// mtx.unlock();//error
#endif}}void outMsgRecvQueue(){int command;for (int i = 0; i < N; i++){bool result = outMsgLULProc(command);if (!result)cout << "empty" << endl;elsecout << "do something" << endl;}}private:bool outMsgLULProc(int &command){#ifdef __WINDOWSLJQ__EnterCriticalSection(&winsec);if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();LeaveCriticalSection(&winsec);return true;}LeaveCriticalSection(&winsec);
#elsemtx.lock();if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();mtx.unlock();return true;}mtx.unlock();
#endifreturn false;}private:list msgRecvQueue;mutex mtx;
#ifdef __WINDOWSLJQ__CRITICAL_SECTION winsec;
#endif
};int main()
{{A a;thread outObj(&A::outMsgRecvQueue, &a);thread inObj(&A::inMsgRecvQueue, std::ref(a));inObj.join();outObj.join();}cout << "main over" << endl;return 0;
}

17.11.3 自动析构技术

RAII(Resource Acquisition Is Initialization)类与对象,资源获取即初始化。构造函数中初始化资源,析构函数中释放资源。只能指针、容器等用到。

#include #include 
#include 
#include 
#include 
using namespace std;#define __WINDOWSLJQ__class CWinLock
{
public:CWinLock(CRITICAL_SECTION *p) : critical(p){EnterCriticalSection(critical); //进入临界区}~CWinLock(){LeaveCriticalSection(critical); //离开临界区}private:CRITICAL_SECTION *critical;
};#define N 5
class A
{
public:A(){
#ifdef __WINDOWSLJQ__InitializeCriticalSection(&winsec);
#endif}virtual ~A(){
#ifdef __WINDOWSLJQ__DeleteCriticalSection(&winsec);
#endif}public:void inMsgRecvQueue(){for (int i = 0; i < N; i++){
#ifdef __WINDOWSLJQ__CWinLock wlock1(&winsec);CWinLock wlock2(&winsec); //调用多次没问题msgRecvQueue.push_back(i);
#elselock_guard g1(mtx);// lock_guard g2(mtx);//errormsgRecvQueue.push_back(i);
#endif}}void outMsgRecvQueue(){int command;for (int i = 0; i < N; i++){bool result = outMsgLULProc(command);if (!result)cout << "empty" << endl;elsecout << "do something" << endl;}}private:bool outMsgLULProc(int &command){#ifdef __WINDOWSLJQ__EnterCriticalSection(&winsec);if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();LeaveCriticalSection(&winsec);return true;}LeaveCriticalSection(&winsec);
#elsemtx.lock();if (!msgRecvQueue.empty()){command = msgRecvQueue.front();msgRecvQueue.pop_front();mtx.unlock();return true;}mtx.unlock();
#endifreturn false;}private:list msgRecvQueue;mutex mtx;
#ifdef __WINDOWSLJQ__CRITICAL_SECTION winsec;
#endif
};int main()
{{A a;thread outObj(&A::outMsgRecvQueue, &a);thread inObj(&A::inMsgRecvQueue, std::ref(a));inObj.join();outObj.join();}cout << "main over" << endl;return 0;
}

17.11.4 recursive_mutex递归的独占互斥量

允许同一个线程多次调用同一个recursive_mutex的lock成员函数,mutex的lock不允许连续多次调用。

class A{
public:void test1(){lock_guard g(mtx);}void test2(){lock_guard g(mtx);test1();}
private:recursive_mutex mtx;
};

17.11.5 带超时的互斥量std::timed_mutx和std::recursive_timed_mutex

std::timed_mutx带超时功能的独占互斥锁;std::recursive_timed_mutex带超时功能的递归的独占互斥锁。

std::timed_mutx mtx;
for(int i=0; i<1000; i++){chrono::milliseconds timeout(100);if(mtx.try_lock_for(timeout)){//if(mtx.try_lock_until(chrono::steady_clock::now() + timeout)){//....mtx.unlock();}else{this_thread::sleep_for(chrono::milliseconds(300));}
}

17.12 补充知识、线程池浅谈、数量谈与总结

17.12.1 知识点补充

  1. 虚假唤醒

wait线程醒来后没有实际可供处理的数据,叫虚假唤醒。
比如,push_back一条数据,调用多次notify_one,或者多个线程取数据,总有个线程唤醒后,但队列中没有数据可处理。

一般while替换if。

#include 
#include 
#include 
#include 
#include 
#include 
using namespace std;#define N 5
class A
{
public:void inMsgRecvQueue(int n){for (int i = 0; i < N; i++){{unique_lock g(m);msgRecvQueue.push_back(i + 1);// g.unlock();}this_thread::sleep_for(chrono::milliseconds(2));cond.notify_all();}//生产结束,消息队列放入0unique_lock g(m);for (int i = 0; i < n; i++)msgRecvQueue.push_back(0);g.unlock();cond.notify_all();cout << "inMsgRecvQueue over" << endl;}void outMsgRecvQueue(){while (true){unique_lock g(m);//cond.wait(g, [&]{ return !msgRecvQueue.empty(); });while(msgRecvQueue.empty())cond.wait(g);int command = msgRecvQueue.front();msgRecvQueue.pop_front();g.unlock();if (command == 0) //消息队列放入0,跳出循环break;cout << "do something" << endl;this_thread::sleep_for(chrono::milliseconds(1));}cout << "outMsgRecvQueue over" << endl;}private:list msgRecvQueue;mutex m;condition_variable cond;
};int main()
{{A a;int n = 5;vector threads(n);for (int i = 0; i < n; i++)threads[i] = thread(&A::outMsgRecvQueue, std::ref(a));thread inObj(&A::inMsgRecvQueue, std::ref(a), n);inObj.join();for (int i = 0; i < n; i++)threads[i].join();}cout << "main over" << endl;return 0;
}
  1. atomic的进一步理解

atomic禁用拷贝构造函数和复制赋值运算符。

atomic atm1;
atm1 = 0;atomic atm3;
//atm3 = atm1;//erroratomic atm5(atm1.load());
atm5.store(12);
atm5 = 12;

17.12.2 线程池浅谈

程序中偶尔达成某种条件时就创建一个线程,不够稳定。
线程池将一堆线程放在一起,进行统一的管理调度。

17.12.3 线程创建数量谈

一般2000各左右线程就是极限,一个进程线程数不要超过500,200以内最好。

相关内容

热门资讯

古代皇帝骂人会讲脏话吗 他们都... 还不知道:古代皇帝骂人会讲脏话吗的读者,下面趣历史小编就为大家带来详细介绍,接着往下看吧~随着科技的...
古代圣旨那么多 为什么现在却很... 还不知道:古代圣旨为什么现在很少能看到了的读者,下面趣历史小编就为大家带来详细介绍,接着往下看吧~皇...
古代接旨到底是什么样 真的要全... 还不知道:古代接旨到底是什么样的读者,下面趣历史小编就为大家带来详细介绍,接着往下看吧~清宫戏里面,...
雍正打击三儿子弘时真的是为了他... 还不知道:雍正打击三儿子弘时是为了什么的读者,下面趣历史小编就为大家带来详细介绍,接着往下看吧~雍正...
如何认定协助组织卖淫罪的情节严... 如果您想第一时间收到我的更新,请点开文章标题下面的蓝色字体的“刑事专业律师何忠民”,再点击右上角,然...