Абстрактный

В части 1 этой серии мы узнали об асинхронном программировании и примитивах программирования, таких как обратные вызовы и фьючерсы. Futures / Promises предоставили программистам возможность обмениваться данными между потоками, не беспокоясь о низкоуровневых примитивах синхронизации потоков.

Параллелизм на основе задач

«Параллелизм на основе задач» относится к более высокому уровню абстракции, когда программист управляет «задачами» - фрагментами работы, которую необходимо выполнить, в то время как библиотека (или язык) представляет API для запуска этих задач. Затем задача библиотеки - запустить потоки, убедиться, что их не слишком мало или не слишком много, обеспечить разумную балансировку нагрузки и т. Д. К лучшему или худшему, это дает программисту меньший контроль над системой на низком уровне, но также дает более высокоуровневые, более удобные и безопасные API для работы.

int accumulate_ret(int* data, size_t count) {
  return std::accumulate(data, data + count, 0);
}
void use_worker_in_std_async() {
  std::vector<int> v{1, 2, 3, 4, 5, 6, 7, 8};
  future<int> fut = async(
      std::launch::async, accumulate_ret, v.data(), v.size());
  cout << "Result " << fut.get() << endl;
}

В приведенном выше примере std :: async создает новый поток, вызывает функцию в контексте, переносит результаты в будущее и скрывает обещание. Это намного проще, чем запускать поток самостоятельно и присоединяться к потокам, чтобы убедиться, что функциональность реализована.

Политика std :: launch :: async передается явно для асинхронного запуска функции. std :: async принимает тип возвращаемого значения функции и возвращает его завернутым в std :: future, что является еще одной удобной абстракцией. В приведенном выше коде ожидание завершения вычислительного потока происходит, когда мы вызываем get () в будущем.

Теперь та же операция с использованием std :: async и фьючерсов:

using int_futures = std::vector<std::future<int>>;
int_futures launch_async(std::vector<int>& v) {
  int_futures futures;
  futures.push_back(async(std::launch::async, accumulate_ret,
                          v.data(), v.size() / 2));
  futures.push_back(async(std::launch::async, accumulate_ret,
                          v.data() + v.size() / 2, v.size() / 2));
  return futures;
}
...
{
  // Usage
  std::vector<int> v{1, 2, 3, 4, 5, 6, 7, 8};
  int_futures futs = launch_async(v);
  cout << "Result : " << futs[0].get() + futs[1].get() << endl;
}

И снова код чище и лаконичнее. Объединение дескриптора потока с ожидаемым результатом имеет больше смысла.

Проблема с асинхронным программированием C ++

Асинхронная функция C ++ имеет политику запуска «отложенный», которая откладывает выполнение задачи до тех пор, пока не будет вызвано ожидание / получение в будущем. Но когда вызывается функция ожидания с периодом времени, отложенная опция не будет выполнять функцию.

Политика запуска асинхронной функции C ++ по умолчанию - «отложенный | async ’, что означает, что компилятор решает, запускать ли задачу в новом потоке или отложить для ленивого выполнения. Это непростое поведение, и найти ошибки будет сложнее. Более того, результаты могут быть другими, когда новый поток выполняет задачу в случае локального хранилища потока. Когда используются блокировки, поведение также может быть проблематичным.

Скотт Майерс в своей книге «Эффективный современный C ++» рекомендует следующую оболочку для запуска задач:

template <typename F, typename... Ts>
inline auto reallyAsync(F&& f, Ts&&... params) {
    return std::async(std::launch::async, 
                      std::forward<F>(f),
                      std::forward<Ts>(params)...);
}

Используйте это вместо необработанных вызовов std :: async, чтобы гарантировать, что задачи всегда запускаются в новых потоках, чтобы мы могли рассуждать о нашей программе более детерминированно.

Авторы GCC также осознали это, и переключили политику запуска libstdc ++ по умолчанию на std :: launch :: async в середине 2015 года.

Асинхронная функция C ++ всегда запускает новый поток, если предоставленная политика запуска является асинхронной. Нет ограничения потоков, пула потоков для кэширования потоков и хорошей балансировки нагрузки между ними.

Шаблон «Исполнитель» помогает решить проблему с пулом потоков и планировщиком. Boost имеет реализацию как для пула потоков, так и для планировщика.

Счетчик файлов

Давайте посмотрим на счетчик синхронизируемой версии файлов в заданном каталоге.

size_t count_files_sync(const path &src)
{ 
  try 
  {  
    if (is_directory(src)) 
    {   
      size_t count = 0;   
      for (directory_entry& item : directory_iterator(src)) 
      {    
        count += count_files_sync(item.path());   
      }    
      return count;  
    }  
    else if (is_regular_file(src)) 
    {                  
      return 1;  
    } 
  } 
  catch (const std::exception& ex) 
  { 
  }  
  return 0;
}

Асинхронная версия вышеуказанной программы

size_t count_files_async(const path &src)
{ 
  try 
  {  
    if (is_directory(src)) 
    {   
      std::list<future<size_t>> results;   
      for (directory_entry& item : directory_iterator(src)) 
      {    
        results.push_back(async(boost::launch::async, count_files_async, item.path()));   
      }    
      wait_for_all(results.begin(), results.end());   
      size_t count = 0;   
      for (auto &f : results)   
      {    
        count += f.get();   
      }    
      return count;  
    }  
    else if (is_regular_file(src))  
    {   
      return (size_t)1;  
    } 
  } 
  catch (const std::exception& ex) { }  
  return (size_t)0;
}

Вышеупомянутая асинхронная версия программы пытается обрабатывать каждый подкаталог в данном каталоге в отдельном потоке. Поскольку async запускает новый поток для каждой задачи из-за политики запуска async, если каталог содержит 1000 подкаталогов, то будет порождено 1000 потоков, которые, в свою очередь, запускают больше потоков при обработке подкаталогов. Это приводит к тому, что слишком много потоков и никакая задача не перемещается дальше из-за ожидания.

Даже с Executor и пулом потоков эту проблему нельзя решить, поскольку одна задача зависит от другой, и ожидание не подходит для масштабирования. Это решение отлично работает для каталога с минимальным количеством файлов.

Истинная асинхронная функция без ожидания необходима в качестве решения для этого.

future<size_t> count_files_pure_async(const path &src)
{ 
  try 
  {  
    if (is_directory(src)) 
    {   
      std::list<future<size_t>> results;   
      for (directory_entry& item : directory_iterator(src)) 
      {    
        results.push_back(count_files_pure_async(item.path()));   
      }    
      auto resFuture = when_all(results.begin(), results.end());                      
      return resFuture.then(boost::launch::deferred,
                [](auto results) 
                {    
                  auto rs = results.get();    
                  size_t count = 0;    
                  for (auto &f : rs)    
                  {     
                    count += f.get();    
                  }     
                  return count;   
                });  
    }  
    else if (is_regular_file(src))  
    {   
      return make_ready_future((size_t)1);  
    } 
  } 
  catch (const std::exception& ex) { }  
  return make_ready_future((size_t)0);
}

Вышеупомянутое решение не использовало wait, а использовало when_all для добавления обратного вызова для обработки после завершения всех параллельных заданий.

Совместное распорядок

Вышеупомянутая программа работает хорошо. Эта асинхронная версия программы отличается от версии для синхронизации.

Совместная процедура делает асинхронную версию программы похожей на синхронизируемую с небольшими вариациями, чтобы сделать ее асинхронной.

Совместная программа - это функции, которые можно приостановить и продолжить. Разница между обычными функциями и совместной подпрограммой заключается в том, что обычная функция не может приостанавливаться и возобновляться, как совместная процедура, а возвращаются после завершения функции. Вызов co_await приостанавливает выполнение текущей функции и возвращает вызов вызывающей стороне после сохранения кадра стека текущей функции, чтобы ее можно было возобновить позже.

Программа счетчика файлов, использующая подпрограмму, выглядит так:

future<size_t> count_files_pure_async(const path &src)
{ 
  try 
  {  
    if (is_directory(src)) 
    {   
      size_t count = 0;   
      for (directory_entry& item : directory_iterator(src)) 
      {    
        count += co_await count_files_pure_async(item.path());   
      }    
      co_return count;  
    }  
    else if (is_regular_file(src))  
    {   
      co_return ((size_t)1);  
    } 
  } 
  catch (const std::exception& ex) 
  { 
  }  
  co_return ((size_t)0);
}

Проблема с сопрограммой

Coroutine очень удобна для чтения и очень похожа на синхронный API. Без должной осторожности не удастся добиться действительно хорошего параллелизма. Например, в приведенной выше функции при первом вызове co_await управление возвращается вызывающей стороне до тех пор, пока функция не будет завершена. Но подсчет нескольких файлов для нескольких подкаталогов может выполняться параллельно без возврата управления вызывающей стороне для первого co_await. Для этого необходимо использовать набор фьючерсов вместе с wait_all.

Пример позволяет увидеть следующий код

string getHello()
{ 
  cout << "getting Hello " << this_thread::get_id() << endl;
  this_thread::sleep_for(10s);
  return "Hello";
}
string getWorld()
{
  cout << "getting World " << this_thread::get_id() << endl;
  this_thread::sleep_for(15s);
  return "World";
} 
future<string> getHelloWorld()
{ 
  auto hello = co_await async(launch::async, getHello); 
  auto world = co_await async(launch::async, getWorld);
  return hello + " " + world;
} 
future<string> getHelloWorldEx()
{ 
  auto hello = async(launch::async, getHello); 
  auto world = async(launch::async, getWorld); 
  auto h = co_await hello; 
  auto w = co_await world;
  return h + " " + w;
}

В функции getHelloWorld функции getHello и getWorld будут выполняться синхронно, но getHellowWorldEx обе функции будут выполняться параллельно. Тщательное программирование позволит избежать последовательного выполнения.

Вывод

В этой статье мы узнали, как можно добиться параллелизма на основе задач с помощью функции std :: async, чередующей несколько задач. Проблема с функцией std :: async также обсуждается без пула потоков и планировщика задач. Затем мы обсудили сценарий узкого места для масштабирования, когда задачи ожидают завершения других задач. Примитивы, такие как when_all или when_any, могут быть использованы для устранения этого узкого места. Мы обсудили совместную процедуру и то, как это помогает писать асинхронную программу, очень похожую на программу синхронизации.

использованная литература

  1. Https://eli.thegreenplace.net/2016/the-promises-and-challenges-of-stdasync-task-based-parallelism-in-c11/
  2. Https://bartoszmilewski.com/2011/10/10/async-tasks-in-c11-not-quite-there-yet/
  3. Код доступен в https://github.com/maharajan1/async-prog
  4. Дальнейшая статья о реактивном программировании находится здесь.