Обещание — будущий канал связи

Оба метода передачи данных в поток, которые мы обсуждали до сих пор, полезны при создании потока: мы можем либо передавать аргументы в функцию потока, используя шаблоны с переменным числом аргументов, либо мы можем использовать лямбду для захвата аргументов по значению или по ссылке. Следующий пример снова иллюстрирует использование этих методов:

#include <iostream>
#include <thread>

void printMessage(std::string message)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work
    std::cout << "Thread 1: " << message << std::endl;
}

int main()
{
    // define message
    std::string message = "My Message";

    // start thread using variadic templates
    std::thread t1(printMessage, message);

    // start thread using a Lambda
    std::thread t2([message] {
        std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work
        std::cout << "Thread 2: " << message << std::endl;
    });

    // thread barrier
    t1.join();
    t2.join();

    return 0;
}

Недостатком этих двух подходов является то, что информация передается от родительского потока (main) к рабочим потокам (t1 и t2). В этом разделе мы хотим рассмотреть способ передачи данных в обратном направлении — то есть из рабочих потоков обратно в родительский поток.

Для этого потоки должны придерживаться строгого протокола синхронизации. В стандарте C++ есть такой механизм, который мы можем использовать для этой цели. Этот механизм действует как одноразовый канал между потоками. Передающий конец канала называется «обещанием», а принимающий конец — «будущим».

В стандарте C++ шаблон класса std::promise предоставляет удобный способ хранения значения или исключения, которое будет получено асинхронно позднее через объект std::future. Каждый объект std::promise предназначен для использования только один раз.

В следующем примере мы хотим объявить обещание, которое позволяет передавать строку между двумя потоками и изменять ее в процессе.

#include <iostream>
#include <thread>
#include <future>

void modifyMessage(std::promise<std::string> && prms, std::string message)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(4000)); // simulate work
    std::string modifiedMessage = message + " has been modified"; 
    prms.set_value(modifiedMessage);
}

int main()
{
    // define message
    std::string messageToThread = "My Message";

    // create promise and future
    std::promise<std::string> prms;
    std::future<std::string> ftr = prms.get_future();

    // start thread and pass promise as argument
    std::thread t(modifyMessage, std::move(prms), messageToThread);

    // print original message to console
    std::cout << "Original message from main(): " << messageToThread << std::endl;

    // retrieve modified message via future and print to console
    std::string messageFromThread = ftr.get();
    std::cout << "Modified message from thread(): " << messageFromThread << std::endl;

    // thread barrier
    t.join();

    return 0;
}

После определения сообщения мы должны создать подходящее обещание, которое может принимать строковый объект. Чтобы получить соответствующее будущее, нам нужно вызвать метод get_future() для промиса. Обещание и будущее — это два типа канала связи, которые мы хотим использовать для передачи строки между потоками. Канал связи, настроенный таким образом, может передавать только строку.

Теперь мы можем создать поток, который принимает функцию, и мы передадим ему обещание в качестве аргумента, а также сообщение, которое нужно изменить. Обещания невозможно скопировать, потому что концепция обещание-будущее представляет собой двухточечный канал связи для одноразового использования. Следовательно, мы должны передать промис функции потока, используя std::move. Затем поток во время своего выполнения будет использовать обещание для передачи обратно измененного сообщения.

Функция потока принимает обещание как ссылку на rvalue в соответствии с семантикой перемещения. После ожидания в течение нескольких секунд сообщение модифицируется, и для промиса вызывается метод set_value().

Вернувшись в основной поток, после запуска потока исходное сообщение выводится на консоль. Затем мы начинаем слушать на другом конце канала связи, вызывая функцию get() на будущем. Этот метод будет блокироваться до тех пор, пока данные не будут доступны, что происходит, как только set_value вызывается для обещания (из потока). Если результат является перемещаемым (как в случае с std::string), он будет перемещен, в противном случае он будет скопирован. После получения данных (со значительной задержкой) измененное сообщение выводится на консоль.

Также возможно, что рабочее значение вызывает set_value для промиса до того, как get() будет вызвано для будущего. В этом случае get() возвращается немедленно без каких-либо задержек. После того, как get() был вызван один раз, future больше нельзя использовать. Это имеет смысл, так как нормальный режим обмена данными между обещанием и будущим работает с std::move — и в этом случае данные больше не доступны в канале после первого вызова get(). Если get() вызывается во второй раз, генерируется исключение.

Использование ожидания ()

В некоторых ситуациях может быть интересно отделить ожидание содержимого от фактического извлечения. Фьючерсы позволяют нам сделать это с помощью функции wait(). Этот метод будет блокироваться, пока будущее не будет готово. Как только он возвращается, гарантируется, что данные доступны, и мы можем использовать get(), чтобы получить их без промедления.

В дополнение к ожиданию стандарт C++ также предлагает метод wait_for, который принимает в качестве входных данных продолжительность времени, а также ожидает получения результата. Метод wait_for() будет блокироваться до тех пор, пока не истечет указанное время ожидания или пока не станет доступен результат — в зависимости от того, что наступит раньше. Возвращаемое значение определяет состояние результата.

#include <iostream>
#include <thread>
#include <future>
#include <cmath>

void computeSqrt(std::promise<double> &&prms, double input)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // simulate work
    double output = sqrt(input);
    prms.set_value(output);
}

int main()
{
    // define input data
    double inputData = 42.0;

    // create promise and future
    std::promise<double> prms;
    std::future<double> ftr = prms.get_future();

    // start thread and pass promise as argument
    std::thread t(computeSqrt, std::move(prms), inputData);

// Student task STARTS here
    // wait for result to become available
    auto status = ftr.wait_for(std::chrono::milliseconds(1000));
    if (status == std::future_status::ready) // result is ready
    {
        std::cout << "Result = " << ftr.get() << std::endl;
    }

    //  timeout has expired or function has not yet been started
    else if (status == std::future_status::timeout || status == std::future_status::deferred)
    {
        std::cout << "Result unavailable" << std::endl;
    }
// Student task ENDS here    

    // thread barrier
    t.join();

    return 0;
}

Прохождение исключений

Канал связи с перспективой на будущее также может использоваться для передачи исключений. Для этого рабочий поток просто устанавливает исключение, а не значение в промисе. В родительском потоке исключение выбрасывается повторно после вызова get() в будущем.

Давайте посмотрим на следующий пример, чтобы увидеть, как работает этот механизм:

#include <iostream>
#include <thread>
#include <future>
#include <cmath>
#include <memory>

void divideByNumber(std::promise<double> &&prms, double num, double denom)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(500)); // simulate work
    try
    {
        if (denom == 0)
            throw std::runtime_error("Exception from thread: Division by zero!");
        else
            prms.set_value(num / denom);
    }
    catch (...)
    {
        prms.set_exception(std::current_exception());
    }
}

int main()
{
    // create promise and future
    std::promise<double> prms;
    std::future<double> ftr = prms.get_future();

    // start thread and pass promise as argument
    double num = 42.0, denom = 0.0;
    std::thread t(divideByNumber, std::move(prms), num, denom);

    // retrieve result within try-catch-block
    try
    {
        double result = ftr.get();
        std::cout << "Result = " << result << std::endl;
    }
    catch (std::runtime_error e)
    {
        std::cout << e.what() << std::endl;
    }

    // thread barrier
    t.join();

    return 0;
}