Обещание — будущий канал связи
Оба метода передачи данных в поток, которые мы обсуждали до сих пор, полезны при создании потока: мы можем либо передавать аргументы в функцию потока, используя шаблоны с переменным числом аргументов, либо мы можем использовать лямбду для захвата аргументов по значению или по ссылке. Следующий пример снова иллюстрирует использование этих методов:
#include <iostream> #include <thread> void printMessage(std::string message) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work std::cout << "Thread 1: " << message << std::endl; } int main() { // define message std::string message = "My Message"; // start thread using variadic templates std::thread t1(printMessage, message); // start thread using a Lambda std::thread t2([message] { std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work std::cout << "Thread 2: " << message << std::endl; }); // thread barrier t1.join(); t2.join(); return 0; }
Недостатком этих двух подходов является то, что информация передается от родительского потока (main
) к рабочим потокам (t1
и t2
). В этом разделе мы хотим рассмотреть способ передачи данных в обратном направлении — то есть из рабочих потоков обратно в родительский поток.
Для этого потоки должны придерживаться строгого протокола синхронизации. В стандарте C++ есть такой механизм, который мы можем использовать для этой цели. Этот механизм действует как одноразовый канал между потоками. Передающий конец канала называется «обещанием», а принимающий конец — «будущим».
В стандарте C++ шаблон класса std::promise предоставляет удобный способ хранения значения или исключения, которое будет получено асинхронно позднее через объект std::future
. Каждый объект std::promise
предназначен для использования только один раз.
В следующем примере мы хотим объявить обещание, которое позволяет передавать строку между двумя потоками и изменять ее в процессе.
#include <iostream> #include <thread> #include <future> void modifyMessage(std::promise<std::string> && prms, std::string message) { std::this_thread::sleep_for(std::chrono::milliseconds(4000)); // simulate work std::string modifiedMessage = message + " has been modified"; prms.set_value(modifiedMessage); } int main() { // define message std::string messageToThread = "My Message"; // create promise and future std::promise<std::string> prms; std::future<std::string> ftr = prms.get_future(); // start thread and pass promise as argument std::thread t(modifyMessage, std::move(prms), messageToThread); // print original message to console std::cout << "Original message from main(): " << messageToThread << std::endl; // retrieve modified message via future and print to console std::string messageFromThread = ftr.get(); std::cout << "Modified message from thread(): " << messageFromThread << std::endl; // thread barrier t.join(); return 0; }
После определения сообщения мы должны создать подходящее обещание, которое может принимать строковый объект. Чтобы получить соответствующее будущее, нам нужно вызвать метод get_future()
для промиса. Обещание и будущее — это два типа канала связи, которые мы хотим использовать для передачи строки между потоками. Канал связи, настроенный таким образом, может передавать только строку.
Теперь мы можем создать поток, который принимает функцию, и мы передадим ему обещание в качестве аргумента, а также сообщение, которое нужно изменить. Обещания невозможно скопировать, потому что концепция обещание-будущее представляет собой двухточечный канал связи для одноразового использования. Следовательно, мы должны передать промис функции потока, используя std::move
. Затем поток во время своего выполнения будет использовать обещание для передачи обратно измененного сообщения.
Функция потока принимает обещание как ссылку на rvalue в соответствии с семантикой перемещения. После ожидания в течение нескольких секунд сообщение модифицируется, и для промиса вызывается метод set_value()
.
Вернувшись в основной поток, после запуска потока исходное сообщение выводится на консоль. Затем мы начинаем слушать на другом конце канала связи, вызывая функцию get()
на будущем. Этот метод будет блокироваться до тех пор, пока данные не будут доступны, что происходит, как только set_value вызывается для обещания (из потока). Если результат является перемещаемым (как в случае с std::string
), он будет перемещен, в противном случае он будет скопирован. После получения данных (со значительной задержкой) измененное сообщение выводится на консоль.
Также возможно, что рабочее значение вызывает set_value для промиса до того, как get()
будет вызвано для будущего. В этом случае get()
возвращается немедленно без каких-либо задержек. После того, как get()
был вызван один раз, future больше нельзя использовать. Это имеет смысл, так как нормальный режим обмена данными между обещанием и будущим работает с std::move
— и в этом случае данные больше не доступны в канале после первого вызова get()
. Если get()
вызывается во второй раз, генерируется исключение.
Использование ожидания ()
В некоторых ситуациях может быть интересно отделить ожидание содержимого от фактического извлечения. Фьючерсы позволяют нам сделать это с помощью функции wait()
. Этот метод будет блокироваться, пока будущее не будет готово. Как только он возвращается, гарантируется, что данные доступны, и мы можем использовать get()
, чтобы получить их без промедления.
В дополнение к ожиданию стандарт C++ также предлагает метод wait_for
, который принимает в качестве входных данных продолжительность времени, а также ожидает получения результата. Метод wait_for()
будет блокироваться до тех пор, пока не истечет указанное время ожидания или пока не станет доступен результат — в зависимости от того, что наступит раньше. Возвращаемое значение определяет состояние результата.
#include <iostream> #include <thread> #include <future> #include <cmath> void computeSqrt(std::promise<double> &&prms, double input) { std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // simulate work double output = sqrt(input); prms.set_value(output); } int main() { // define input data double inputData = 42.0; // create promise and future std::promise<double> prms; std::future<double> ftr = prms.get_future(); // start thread and pass promise as argument std::thread t(computeSqrt, std::move(prms), inputData); // Student task STARTS here // wait for result to become available auto status = ftr.wait_for(std::chrono::milliseconds(1000)); if (status == std::future_status::ready) // result is ready { std::cout << "Result = " << ftr.get() << std::endl; } // timeout has expired or function has not yet been started else if (status == std::future_status::timeout || status == std::future_status::deferred) { std::cout << "Result unavailable" << std::endl; } // Student task ENDS here // thread barrier t.join(); return 0; }
Прохождение исключений
Канал связи с перспективой на будущее также может использоваться для передачи исключений. Для этого рабочий поток просто устанавливает исключение, а не значение в промисе. В родительском потоке исключение выбрасывается повторно после вызова get()
в будущем.
Давайте посмотрим на следующий пример, чтобы увидеть, как работает этот механизм:
#include <iostream> #include <thread> #include <future> #include <cmath> #include <memory> void divideByNumber(std::promise<double> &&prms, double num, double denom) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); // simulate work try { if (denom == 0) throw std::runtime_error("Exception from thread: Division by zero!"); else prms.set_value(num / denom); } catch (...) { prms.set_exception(std::current_exception()); } } int main() { // create promise and future std::promise<double> prms; std::future<double> ftr = prms.get_future(); // start thread and pass promise as argument double num = 42.0, denom = 0.0; std::thread t(divideByNumber, std::move(prms), num, denom); // retrieve result within try-catch-block try { double result = ftr.get(); std::cout << "Result = " << result << std::endl; } catch (std::runtime_error e) { std::cout << e.what() << std::endl; } // thread barrier t.join(); return 0; }