并发库
Published on
线程
C++11引入了与操作系统无关的std::thread类,可以使用统一的方式创建线程。C++20提供了std::jthread类,在std::thread的基础上增加了自动合并与外部请求终止的功能。
auto func = []()
{
for (int i = 0; i < 10; ++i)
std::cout << i << " ";
std::cout << std::endl;
};
std::thread t(func); // 创建一个线程并开始执行
if (t.joinable()) // 检查线程可否被合并
t.join(); // 阻塞等待线程结束
if (t.joinable())
t.detach(); // 将线程和线程对象分离,无法再与线程交互
互斥
使用互斥避免多个线程同时访问共享资源,这可以避免数据竞争,并提供线程间的同步支持。
互斥量
C++并发库中提供以下互斥量:
- mutex:基本互斥量
- timed_mutex:带时限互斥量
- recursive_mutex:能被同一线程递归锁定的互斥量
- recursive_timed_mutex:带时限且能被同一线程递归锁定的互斥量
- shared_mutex:共享互斥量
- shared_timed_mutex:带时限共享互斥量
std::mutex mut; // 互斥量
std::vector<int> nums; // 共享变量
auto func = [&](int k)
{
mut.lock(); // 对互斥量加锁
nums.push_back(k); // 访问共享变量
mut.unlock(); // 解锁互斥量
};
// 生成5个线程,它们会访问同一个共享变量
std::thread threads[5];
for (int i = 0; i < 5; ++i)
threads[i] = std::thread(func, i);
for (auto &th : threads)
th.join();
互斥量管理
直接使用互斥量时,容易由于编码失误导致一直持有互斥量。互斥量封装器(锁)使用RAII的方式对互斥量进行封装,从而可以自动对互斥量加锁/解锁。并发库中提供以下锁:
- lock_guard:严格基于作用域的封装器
- scoped_lock:用于多个互斥量的免死锁封装器
- unique_lock:可移动的封装器
- shared_lock:可移动的共享封装器
std::mutex mut; // 互斥量
std::vector<int> nums; // 共享变量
auto func = [&](int k)
{
std::lock_guard<std::mutex> lock(mut); // 使用lock_guard对互斥量加锁
nums.push_back(k); // 访问共享变量
}; // 退出作用域时lock_guard会自动解锁互斥量
// 生成5个线程,它们会访问同一个共享变量
std::thread threads[5];
for (int i = 0; i < 5; ++i)
threads[i] = std::thread(func, i);
for (auto &th : threads)
th.join();
单次调用
使用std::call_once函数模板可以保证某一函数在多线程环境中只被调用一次。
std::once_flag onceflag;
void do_once()
{
std::call_once(onceflag, []() { std::cout << "call once" << std::endl; });
}
int main()
{
std::thread threads[5];
for (int i = 0; i < 5; ++i)
threads[i] = std::thread(do_once);
for (auto &th : threads)
th.join();
}
条件变量
条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的唤醒,然后再继续。条件变量始终关联到一个互斥量。
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// 等待直至main()发送数据
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] { return ready; });
// 等待后,本线程占有锁
std::cout << "Worker thread is processing data\n";
data += " after processing";
// 发送数据回main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// 通知前手动解锁,以避免等待线程刚被唤醒就阻塞(细节见notify_one)
lk.unlock();
cv.notify_one();
}
int main()
{
std::thread worker(worker_thread);
data = "Example data";
// 发送数据到worker线程
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one();
// 等候worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] { return processed; });
}
std::cout << "Back in main(), data = " << data << '\n';
worker.join();
}
原子操作
如果多个线程共享一个变量,即使对于int等简单类型,在访问前也需要加锁以避免数据竞争。使用std::atomic可以将变量进行封装,使对变量的操作变为原子操作。在执行原子操作时无需加锁,这使得代码更加简洁与安全。
// 普通类型计数器
struct OriginCounter
{
int count; // 使用普通类型
std::mutex mut; // 需要互斥量
void add()
{
std::lock_guard<std::mutex> lock(mut); // 加锁
++count; // 对变量进行操作
}
void sub()
{
std::lock_guard<std::mutex> lock(mut);
--count;
}
int get()
{
std::lock_guard<std::mutex> lock(mut);
return count;
}
};
// 原子类型计数器
struct NewCounter
{
std::atomic<int> count; // 使用原子类型
void add()
{
++count; // 直接对变量进行操作,无需加锁
count.store(++count); // 或使用store函数操作
}
void sub()
{
--count;
count.store(--count);
}
int get()
{
return count.load();
}
};
信号量
信号量是一种轻量同步元件,能控制对共享资源的访问,它允许同时有最多max个访问者。并发库提供以下两种信号量:
- std::counting_semaphore:实现非负计数的信号量
- std::binary_semaphore:仅拥有两个状态的信号量(实际上是最大访问量为1的std::counting_semaphore)
// 全局二元信号量实例
// 设置对象计数为零
// 对象在未被发信状态
std::binary_semaphore smphSignal(0);
void ThreadProc()
{
// 通过尝试减少信号量的计数等待来自主程序的信号
smphSignal.acquire();
// 此调用阻塞直至信号量的计数被从主程序增加
std::cout << "[thread] Got the signal" << std::endl;
// 等待3秒以模仿某种线程正在进行的工作
std::this_thread::sleep_for(3s);
std::cout << "[thread] Send the signal\n";
// 对主程序回复发信
smphSignal.release();
}
int main()
{
// 创建某个背景工作线程,它将长期存在
std::jthread thrWorker(ThreadProc);
std::cout << "[main] Send the signal\n";
// 通过增加信号量的计数对工作线程发信以开始工作
smphSignal.release();
// release()后随acquire()会阻止工作线程获取信号量,所以添加延迟
std::this_thread::sleep_for(50ms);
// 通过试图减少信号量的计数等待直至工作线程完成工作
smphSignal.acquire();
std::cout << "[main] Got the signal\n";
}
锁存器与屏障
锁存器与屏障是线程协调机制,允许任何数量的线程阻塞直至期待数量的线程到达该屏障。
- std::latch:单次使用的线程屏障
- std::barrier:可复用的线程屏障
int main()
{
const auto workers = {"anil", "busara", "carl"};
auto on_completion = []() noexcept
{
static auto phase = "... done\nCleaning up...\n";
std::cout << phase;
phase = "... done\n";
};
std::barrier sync_point(std::ssize(workers), on_completion);
auto work = [&](std::string name)
{
std::string product = " " + name + " worked\n";
std::cout << product;
sync_point.arrive_and_wait();
product = " " + name + " cleaned\n";
std::cout << product;
sync_point.arrive_and_wait();
};
std::cout << "Starting...\n";
std::vector<std::thread> threads;
for (auto const &worker : workers)
{
threads.emplace_back(work, worker);
}
for (auto &thread : threads)
{
thread.join();
}
}
异步
在使用上述同步机制时,如果要在线程间传递数据,则用户需要编写一些与业务逻辑无关的代码。为了使用户可以专注于业务逻辑,减少不必要的工作,C++并发库提供了一些工具来获取异步任务(即在单独的线程中启动的函数)的返回值,并捕捉其所抛出的异常。这些工具包括:
- std::future:用于访问异步任务的结果
- std::promise:手动发送值或异常至std::future对象
- std::packaged_task:包装任何可调用目标(函数、lambda表达式、bind表达式或其他函数对象),将其返回值或所抛异常发送至std::future对象
- std::async:异步地运行函数,将其返回值或所抛异常发送至std::future对象
其中,std::future用于接收数据,其它三个类型用于发送数据。使用流程为:
- 通过std::async、std::packaged_task或std::promise创建异步任务,同时提供一个std::future对象给创建者
- 异步任务完成后,通过std::future向创建者返回值
- 创建者可以查询、等待或从std::future提取值(若异步任务未完成,则这些方法可能阻塞)
// 使用std::promise
std::promise<int> p; // 创建promise
std::future<int> f1 = p.get_future(); // 获取future
std::thread([&p]{ p.set_value_at_thread_exit(1); }).detach(); // 在线程上运行
// 使用std::packaged_task
std::packaged_task<int()> task([](){ return 2; }); // 包装函数
std::future<int> f2 = task.get_future(); // 获取future
std::thread(std::move(task)).detach(); // 在线程上运行
// 使用std::async
std::future<int> f3 = std::async(std::launch::async, [](){ return 3; }); // 执行异步任务,获取future
std::cout << "Waiting..." << std::flush; // 等待三个线程运行完成
f1.wait();
f2.wait();
f3.wait();
std::cout << "Done!\nResults are: " // 打印结果
<< f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';