Понимание примера возобновляемых функций в предложении N3650 для C++1y

Рассмотрим следующий пример, взятый из N3650:

int cnt = 0;
do {
   cnt = await streamR.read(512, buf);
   if (cnt == 0)
      break;
   cnt = await streamW.write(cnt, buf);
} while (cnt > 0);

Я, наверное, что-то упускаю, но если я хорошо понял async и await, то какой смысл показывать полезность двух конструкций на приведенном выше примере, когда эффекты эквивалентны написанию:

int cnt = 0;
do {
   cnt = streamR.read(512, buf).get();
   if (cnt == 0)
      break;
   cnt = streamW.write(cnt, buf).get();
} while (cnt > 0);

где вызовы read().get() и write().get() синхронны?


person Martin    schedule 20.08.2013    source источник
comment
Я думаю, было бы понятнее, если бы вы создали полный пример, включающий объявление и вызов возобновляемой функции.   -  person dyp    schedule 20.08.2013
comment
Вот пример использования ноябрьской CTP-реализации Visual C++: из приложения c Windows Store"> stackoverflow.com/questions/19309508/   -  person Raman Sharma    schedule 21.12.2013


Ответы (5)


Ключевое слово await не эквивалентно вызову get для будущего. Вы можете посмотреть на это примерно так, предположим, вы начинаете с этого:

future<T> complex_function()
{
     do_some_stuff();
     future<Result> x = await some_async_operation();
     return do_some_other_stuff(x);
}

Это функционально более или менее похоже на

future<T> complex_function()
{
     do_some_stuff();
     return some_async_operation().then([=](future<Result> x) {
         return do_some_other_stuff(x);
     });
}

Обратите внимание, более или менее, поскольку есть некоторые последствия для управления ресурсами, переменные, созданные в do_some_stuff, не должны быть скопированы для выполнения do_some_other_stuff, как это будет делать лямбда-версия.

Второй вариант делает более понятным, что произойдет при вызове.

  1. do_some_stuff() будет вызываться синхронно при вызове complex_function.
  2. some_async_operation вызывается асинхронно и приводит к будущему. Точный момент выполнения этой операции зависит от вашей фактической реализации асинхронного вызова, это может быть немедленно, когда вы используете потоки, это может быть всякий раз, когда вызывается .get(), когда вы используете отложенное выполнение.
  3. Мы не выполняем do_some_other_stuff сразу, а привязываем его к будущему, полученному на шаге 2. Это означает, что его можно выполнить, как только будет готов результат из some_async_operation, но не раньше. Кроме того, его момент выполнения определяется временем выполнения. Если реализация будет просто обертывать предложение then, это означает, что она унаследует политику исполнителя/запуска родительского будущего (согласно N3558).
  4. Функция возвращает последнее будущее, которое представляет конечный результат. Обратите внимание, что это НЕОБХОДИМО в будущем, так как часть тела функции выполняется асинхронно.
person KillianDS    schedule 20.08.2013

Более полный пример (надеюсь, правильный):

future<void> forwardMsgs(istream& streamR, ostream& streamW) async
{
    char buf[512];
    int cnt = 0;
    do {
       cnt = await streamR.read(512, buf);
       if (cnt == 0)
          break;
       cnt = await streamW.write(cnt, buf);
    } while (cnt > 0);
}

future<void> fut = forwardMsgs(myStreamR, myStreamW);

/* do something */

fut.get();

Важный момент (цитата из проекта):

После приостановки возобновляемая функция может быть возобновлена ​​логикой планирования среды выполнения и в конечном итоге завершит свою логику, после чего она выполнит оператор возврата (явный или неявный) и установит значение результата функции в заполнителе.

и:

Возобновляемая функция может продолжать выполнение в другом потоке после возобновления после приостановки ее выполнения.

То есть поток, который первоначально вызвал forwardMsgs, может вернуться в любой из точек приостановки. Если да, то во время строки /* do something */ код внутри строки forwardMsgs может выполняться другим потоком, даже если функция была вызвана "синхронно".


Этот пример очень похож на

future<void> fut = std::async(forwardMsgs, myStreamR, myStreamW);

/* do something */

fut.get();

Разница в том, что возобновляемая функция может выполняться разными потоками: другой поток может возобновить выполнение (возобновляемой функции) после каждой точки возобновления/приостановки.

person dyp    schedule 20.08.2013
comment
Возобновляемая функция может продолжить выполнение в другом потоке после возобновления после приостановки ее выполнения. Это подозрительно похоже на что-то из Haskell/Go/Rust: среда выполнения языка заставляет асинхронный ввод-вывод выглядеть синхронным с точки зрения пользователя самостоятельно просматривать и мультиплексировать потоки выполнения. - person Matthieu M.; 20.08.2013
comment
@MatthieuM. за исключением того, что это не (обязательно) среда выполнения языка. Вы можете использовать его с любой формой асинхронной системы планирования, построенной на основе фьючерсов и обещаний. - person KillianDS; 20.08.2013
comment
@KillianDS: ну, это включает в себя логику планирования среды выполнения, есть ли способ настроить это или это предусмотрено реализацией? Я не хотел предполагать, что это было ограничено вводом-выводом, просто мне казалось, что это зеленые потоки для C++, такие как они существуют в Haskell/Go/Rust (и, возможно, в других языках). - person Matthieu M.; 20.08.2013
comment
@MatthieuM. если я правильно понимаю, await будет работать с любым вызовом, который возвращает будущее. Будь то std::async, какой-нибудь отложенный реактор, пул потоков или гринтреды. У меня нет впечатления, что это само по себе связано с зелеными потоками. - person KillianDS; 20.08.2013
comment
@KillianDS: Тот факт, что выполнение может начаться в одном потоке и быть перемещенным языковой средой выполнения в другой, вполне соответствует определению зеленых потоков; поскольку вы больше не контролируете поток ОС, в котором выполняется ваш код. - person Matthieu M.; 20.08.2013
comment
@MatthieuM. затем я использую другое определение greenthreads (виртуализированные/симулированные потоки пользовательского пространства iso фактические потоки ОС). Но у вас все еще есть контроль. Фактически вы говорите себе: «выполните оставшуюся часть этой функции в контексте этого асинхронного вызова». Решает не время выполнения, а вы. - person KillianDS; 20.08.2013
comment
@KillianDS: мне не ясно, имеете ли вы здесь фактический контроль: Возобновляемая функция может продолжать выполнение в другом потоке после возобновления после приостановки ее выполнения. кажется, подразумевает, что когда асинхронная операция завершается (возможно, системный вызов), то выполнение может возобновиться в потоке, который вы не выбрали явно. - person Matthieu M.; 21.08.2013
comment
@MatthieuM. Извините, вы правы, я пропустил, что часть реализации, основанная на then, по-видимому, просто пример, а не ссылка. Если нет необходимости обязательно обертывать then, планирование для конкретной среды выполнения действительно возможно. Позор, однако, было бы более разумно дать ему те же или подобные ограничения, что и then, мне не нравится, когда среда выполнения делает такие вещи для меня (или, по крайней мере, не дает мне возможность переопределить это). - person KillianDS; 22.08.2013

Я думаю, что идея в том, что вызовы streamR.read() и streamW.write() являются асинхронными операциями ввода-вывода и возвращают фьючерсы, которые автоматически ожидаются выражениями await.

Таким образом, эквивалентная синхронная версия должна была бы вызвать future::get() для получения результатов, например.

int cnt = 0;
do {
   cnt = streamR.read(512, buf).get();
   if (cnt == 0)
      break;
   cnt = streamW.write(cnt, buf).get();
} while (cnt > 0);

Вы правильно заметили, что здесь нет параллелизма. Однако в контексте возобновляемой функции await делает поведение отличным от приведенного выше фрагмента. При достижении await функция вернет future, поэтому вызывающая функция может продолжать работу без блокировки, даже если возобновляемая функция заблокирована на await в ожидании какого-либо другого результата (например, в этом случае вызовы read() или write() для завершения .) Возобновляемая функция может возобновить работу в асинхронном режиме, поэтому результат становится доступным в фоновом режиме, пока вызывающий объект делает что-то еще.

person Jonathan Wakely    schedule 20.08.2013
comment
которые автоматически ожидаются выражениями ожидания. Это меня немного смутило. Вы имеете в виду автоматически, как в если/когда это требуется (например, путем вызова .get() для возвращаемого future возобновляемой функции)? - person dyp; 20.08.2013
comment
Я сделал вопрос более ясным, добавив get() после вызовов. Благодарю. - person Martin; 20.08.2013
comment
@DyP, я полагаю, что возобновляемая функция может возобновить выполнение спонтанно (в другом потоке), а не только тогда, когда .get() вызывается для возвращаемого будущего. Я имел в виду, что выполнение возобновляемой функции не может продолжаться после оператора await, пока не дождется своего операнда. Я отредактировал свой ответ, поэтому надеюсь, что он понятнее - person Jonathan Wakely; 20.08.2013

Вот правильный перевод примера функции, чтобы не использовать ожидание:

struct Copy$StackFrame {
  promise<void> $result;
  input_stream& streamR;
  output_stream& streamW;
  int cnt;
  char buf[512];
};
using Copy$StackPtr = std::shared_ptr<Copy$StackFrame>;

future<void> Copy(input_stream& streamR, output_stream& streamW) {
  Copy$StackPtr $stack{ new Copy$StackFrame{ {}, streamR, streamW, 0 } };
  future<int> f$1 = $stack->streamR.read(512, stack->buf);
  f$1.then([$stack](future<int> f) { Copy$Cont1($stack, std::move(f)); });
  return $stack->$result.get_future();
}

void Copy$Cont1(Copy$StackPtr $stack, future<int> f$1) {
  try {
    $stack->cnt = f$1.get();
    if ($stack->cnt == 0) {
      // break;
      $stack->$result.set_value();
      return;
    }
    future<int> f$2 = $stack->streamW.write($stack->cnt, $stack->buf);
    f$2.then([$stack](future<int> f) { Copy$Cont2($stack, std::move(f)); });
  } catch (...) {
    $stack->$result.set_exception(std::current_exception());
  }
}

void Copy$Cont2(Copy$StackPtr $stack, future<int> f$2) {
  try {
    $stack->cnt = f$2.get();
    // while (cnt > 0)
    if (cnt <= 0) {
      $stack->$result.set_value();
      return;
    }
    future<int> f$1 = $stack->streamR.read(512, stack->buf);
    f$1.then([$stack](future<int> f) { Copy$Cont1($stack, std::move(f)); });
  } catch (...) {
    $stack->$result.set_exception(std::current_exception());
  }
}

Как видите, преобразование компилятора здесь довольно сложное. Ключевым моментом здесь является то, что, в отличие от версии get(), исходная версия Copy возвращает свое будущее, как только был сделан первый асинхронный вызов.

person Sebastian Redl    schedule 20.08.2013

У меня та же проблема со значением разницы между этими двумя примерами кода. Давайте перепишем их немного, чтобы быть более полными.

    // Having two functions
    future<void> f (istream&streamR, ostream&streamW) async
    {  int cnt = 0;
       do {
          cnt = await streamR.read(512, buf);
          if (cnt == 0)
             break;
          cnt = await streamW.write(cnt, buf);
       } while (cnt > 0);
    }
    void g(istream&streamR, ostream&streamW)
    {  int cnt = 0;
       do {
          cnt = streamR.read(512, buf).get();
          if (cnt == 0)
             break;
          cnt = streamW.write(cnt, buf).get();
       } while (cnt > 0);
    }
    // what is the difference between
    auto a = f(streamR, streamW);
    // and 
    auto b = async(g, streamR, streamW);

Вам все еще нужно по крайней мере три стопки. В обоих случаях основной поток не блокируется. Является ли это предположением, что await будет реализовано компилятором более эффективно, чем future‹>:get()?. Теперь можно использовать тот, что без await.

Спасибо, Адам Зелински.

person Adam Zielinski    schedule 03.02.2014