Wang's blog

并发库

Published on

线程

C++11引入了与操作系统无关的std::thread类,可以使用统一的方式创建线程。C++20提供了std::jthread类,在std::thread的基础上增加了自动合并与外部请求终止的功能。

auto func = []()
{
    for (int i = 0; i < 10; ++i)
        std::cout << i << " ";
    std::cout << std::endl;
};

std::thread t(func);    // 创建一个线程并开始执行
if (t.joinable())       // 检查线程可否被合并
    t.join();           // 阻塞等待线程结束
if (t.joinable())
    t.detach();         // 将线程和线程对象分离,无法再与线程交互

互斥

使用互斥避免多个线程同时访问共享资源,这可以避免数据竞争,并提供线程间的同步支持。

互斥量

C++并发库中提供以下互斥量:

  • mutex:基本互斥量
  • timed_mutex:带时限互斥量
  • recursive_mutex:能被同一线程递归锁定的互斥量
  • recursive_timed_mutex:带时限且能被同一线程递归锁定的互斥量
  • shared_mutex:共享互斥量
  • shared_timed_mutex:带时限共享互斥量
std::mutex mut;        // 互斥量
std::vector<int> nums; // 共享变量

auto func = [&](int k)
{
    mut.lock();        // 对互斥量加锁
    nums.push_back(k); // 访问共享变量
    mut.unlock();      // 解锁互斥量
};

// 生成5个线程,它们会访问同一个共享变量
std::thread threads[5];
for (int i = 0; i < 5; ++i)
    threads[i] = std::thread(func, i);
for (auto &th : threads)
    th.join();

互斥量管理

直接使用互斥量时,容易由于编码失误导致一直持有互斥量。互斥量封装器(锁)使用RAII的方式对互斥量进行封装,从而可以自动对互斥量加锁/解锁。并发库中提供以下锁:

  • lock_guard:严格基于作用域的封装器
  • scoped_lock:用于多个互斥量的免死锁封装器
  • unique_lock:可移动的封装器
  • shared_lock:可移动的共享封装器
std::mutex mut;                             // 互斥量
std::vector<int> nums;                      // 共享变量

auto func = [&](int k)
{
    std::lock_guard<std::mutex> lock(mut);  // 使用lock_guard对互斥量加锁
    nums.push_back(k);                      // 访问共享变量
};                                          // 退出作用域时lock_guard会自动解锁互斥量

// 生成5个线程,它们会访问同一个共享变量
std::thread threads[5];
for (int i = 0; i < 5; ++i)
    threads[i] = std::thread(func, i);
for (auto &th : threads)
    th.join();

单次调用

使用std::call_once函数模板可以保证某一函数在多线程环境中只被调用一次。

std::once_flag onceflag;

void do_once()
{
    std::call_once(onceflag, []() { std::cout << "call once" << std::endl; });
}

int main()
{
    std::thread threads[5];
    for (int i = 0; i < 5; ++i)
        threads[i] = std::thread(do_once);
    for (auto &th : threads)
        th.join();
}

条件变量

条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的唤醒,然后再继续。条件变量始终关联到一个互斥量。

std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread()
{
    // 等待直至main()发送数据
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, [] { return ready; });

    // 等待后,本线程占有锁
    std::cout << "Worker thread is processing data\n";
    data += " after processing";

    // 发送数据回main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";

    // 通知前手动解锁,以避免等待线程刚被唤醒就阻塞(细节见notify_one)
    lk.unlock();
    cv.notify_one();
}

int main()
{
    std::thread worker(worker_thread);

    data = "Example data";
    // 发送数据到worker线程
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one();

    // 等候worker
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, [] { return processed; });
    }
    std::cout << "Back in main(), data = " << data << '\n';

    worker.join();
}

原子操作

如果多个线程共享一个变量,即使对于int等简单类型,在访问前也需要加锁以避免数据竞争。使用std::atomic可以将变量进行封装,使对变量的操作变为原子操作。在执行原子操作时无需加锁,这使得代码更加简洁与安全。

// 普通类型计数器
struct OriginCounter
{
    int count;                                  // 使用普通类型
    std::mutex mut;                             // 需要互斥量
    void add()
    {
        std::lock_guard<std::mutex> lock(mut);  // 加锁
        ++count;                                // 对变量进行操作
    }

    void sub()
    {
        std::lock_guard<std::mutex> lock(mut);
        --count;
    }

    int get()
    {
        std::lock_guard<std::mutex> lock(mut);
        return count;
    }
};

// 原子类型计数器
struct NewCounter
{
    std::atomic<int> count;                     // 使用原子类型
    void add()
    {
        ++count;                                // 直接对变量进行操作,无需加锁
        count.store(++count);                   // 或使用store函数操作
    }

    void sub()
    {
        --count;
        count.store(--count);
    }

    int get()
    {
        return count.load();
    }
};

信号量

信号量是一种轻量同步元件,能控制对共享资源的访问,它允许同时有最多max个访问者。并发库提供以下两种信号量:

  • std::counting_semaphore:实现非负计数的信号量
  • std::binary_semaphore:仅拥有两个状态的信号量(实际上是最大访问量为1的std::counting_semaphore)
// 全局二元信号量实例
// 设置对象计数为零
// 对象在未被发信状态
std::binary_semaphore smphSignal(0);

void ThreadProc()
{
    // 通过尝试减少信号量的计数等待来自主程序的信号
    smphSignal.acquire();

    // 此调用阻塞直至信号量的计数被从主程序增加
    std::cout << "[thread] Got the signal" << std::endl;

    // 等待3秒以模仿某种线程正在进行的工作
    std::this_thread::sleep_for(3s);

    std::cout << "[thread] Send the signal\n";

    // 对主程序回复发信
    smphSignal.release();
}

int main()
{
    // 创建某个背景工作线程,它将长期存在
    std::jthread thrWorker(ThreadProc);

    std::cout << "[main] Send the signal\n";

    // 通过增加信号量的计数对工作线程发信以开始工作
    smphSignal.release();

    // release()后随acquire()会阻止工作线程获取信号量,所以添加延迟
    std::this_thread::sleep_for(50ms);

    // 通过试图减少信号量的计数等待直至工作线程完成工作
    smphSignal.acquire();

    std::cout << "[main] Got the signal\n";
}

锁存器与屏障

锁存器与屏障是线程协调机制,允许任何数量的线程阻塞直至期待数量的线程到达该屏障。

  • std::latch:单次使用的线程屏障
  • std::barrier:可复用的线程屏障
int main()
{
    const auto workers = {"anil", "busara", "carl"};

    auto on_completion = []() noexcept
    {
        static auto phase = "... done\nCleaning up...\n";
        std::cout << phase;
        phase = "... done\n";
    };
    std::barrier sync_point(std::ssize(workers), on_completion);

    auto work = [&](std::string name)
    {
        std::string product = "  " + name + " worked\n";
        std::cout << product;
        sync_point.arrive_and_wait();

        product = "  " + name + " cleaned\n";
        std::cout << product;
        sync_point.arrive_and_wait();
    };

    std::cout << "Starting...\n";
    std::vector<std::thread> threads;
    for (auto const &worker : workers)
    {
        threads.emplace_back(work, worker);
    }
    for (auto &thread : threads)
    {
        thread.join();
    }
}

异步

在使用上述同步机制时,如果要在线程间传递数据,则用户需要编写一些与业务逻辑无关的代码。为了使用户可以专注于业务逻辑,减少不必要的工作,C++并发库提供了一些工具来获取异步任务(即在单独的线程中启动的函数)的返回值,并捕捉其所抛出的异常。这些工具包括:

  • std::future:用于访问异步任务的结果
  • std::promise:手动发送值或异常至std::future对象
  • std::packaged_task:包装任何可调用目标(函数、lambda表达式、bind表达式或其他函数对象),将其返回值或所抛异常发送至std::future对象
  • std::async:异步地运行函数,将其返回值或所抛异常发送至std::future对象

其中,std::future用于接收数据,其它三个类型用于发送数据。使用流程为:

  • 通过std::async、std::packaged_task或std::promise创建异步任务,同时提供一个std::future对象给创建者
  • 异步任务完成后,通过std::future向创建者返回值
  • 创建者可以查询、等待或从std::future提取值(若异步任务未完成,则这些方法可能阻塞)
// 使用std::promise
std::promise<int> p;                                                        // 创建promise
std::future<int> f1 = p.get_future();                                       // 获取future
std::thread([&p]{ p.set_value_at_thread_exit(1); }).detach();               // 在线程上运行

// 使用std::packaged_task
std::packaged_task<int()> task([](){ return 2; });                          // 包装函数
std::future<int> f2 = task.get_future();                                    // 获取future
std::thread(std::move(task)).detach();                                      // 在线程上运行

// 使用std::async
std::future<int> f3 = std::async(std::launch::async, [](){ return 3; });    // 执行异步任务,获取future

std::cout << "Waiting..." << std::flush;                                    // 等待三个线程运行完成
f1.wait();
f2.wait();
f3.wait();
std::cout << "Done!\nResults are: "                                         // 打印结果
            << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';