Ежедневный бит (е) C++ # 20, Переменные условия: std::condition_variable и std::condition_variable_any

std::condition_variable и std::condition_variable_any оборачивают логику потока, ожидающего предварительного условия.

Нередко потоку не просто нужно получить блокировку для продолжения работы; также должны быть соблюдены определенные условия. Например, входы должны быть готовы.

Переменные условия принимают полученную блокировку и условие и будут:

  1. снять блокировку и приостановить нить
  2. когда поток пробуждается, условная переменная попытается повторно получить блокировку; если это не удается, поток снова приостанавливается, и мы повторяем 2.
  3. если блокировка успешно получена, условие проверяется; если это все еще ложно, мы повторяем 1.
  4. если условие истинно, вызов wait() условной переменной завершается, и теперь поток удерживает блокировку, а проверенное условие гарантируется истинным

Помимо блокировки условия, условные переменные также поддерживают пробуждение одного или всех потоков, ожидающих условия, с помощью методов notify_one() и notify_all().

#include <thread>
#include <mutex>
#include <condition_variable>
#include <syncstream>
#include <iostream>
using namespace std::chrono_literals;

struct Resource {
    bool full = false;    
    std::mutex mux;
    // Note that std::condition_variable only works with
    // std::unique_lock<std::mutex>, for other combinations
    // use std::condition_variable_any which may be less efficient.
    std::condition_variable cond;
    void produce() {
        {
        std::unique_lock lock(mux);
        // wait until the condition is true
        // 1. the lock is released
        // 2. when the thread is woken up, the lock is reacquired 
        //    and the condition checked
        // 3. if the condition is still not true, the lock 
        //    is rereleased, and we go to step 2.
        // 4. if the condition is true, the wait() call finishes
        cond.wait(lock, [this]{ return !full; });
        std::osyncstream(std::cout) << 
          "Filling the resource and notifying the consumer.\n";
        full = true;
        std::this_thread::sleep_for(200ms);
        }
        // wake up one thread waiting on this condition variable
        // note that we already released our lock, otherwise
        // the notified thread would wake up and fail to acquire
        // the lock and suspend itself again
        cond.notify_one();
    }
    void consume() {
        {
        std::unique_lock lock(mux);
        // same as above, but with opposite semantics
        cond.wait(lock, [this]{ return full; });
        std::osyncstream(std::cout) << 
          "Consuming the resource and notifying the producer.\n";
        full = false;
        std::this_thread::sleep_for(200ms);
        }
        cond.notify_one();
    }
};

int main() {
    Resource resource;
    auto t1 = std::jthread([&resource](std::stop_token token){
        while (!token.stop_requested())
            resource.produce();
    });
    auto t2 = std::jthread([&resource](std::stop_token token){
        while (!token.stop_requested())
            resource.consume();
    });
    std::this_thread::sleep_for(2s);
    t1.request_stop();
    t2.request_stop();
    // Note: using request_stop here is unsafe.

    // If we removed the sleep_for, the t2 thread could 
    // run an entire loop before it notices the stop request 
    // and considering that t1 no longer runs, the blocking
    // condition would never be fulfilled.

    // This can be prevented by using condition_variable_any, 
    // which supports a stop token, or a timeout.
}

Откройте этот пример в Compiler Explorer.