Go, разработанный Google, задумывался как язык, использующий преимущества современных компьютеров (и кластеров компьютеров) для более быстрого выполнения параллельного и асинхронного кода с простой семантикой. Однако многие из первоначальных идей и шаблонов были усовершенствованы, упрощены и улучшены.

В разделах Основы параллельных вычислений в Go и Дополнительная информация о каналах Go, параллелизме и параллелизме» мы рассмотрели некоторые основные понятия параллелизма и параллелизма в Go. Однако в этот раз мы рассмотрим более продвинутый, но более простой для написания производительный код Go.

Группа ожидания

Группа ожидания — это асинхронная концепция, которая позволяет нам ждать выполнения определенных условий, часто используемых в качестве барьерного типа синхронизации. Барьеры позволяют нам синхронизировать несколько одновременных операций в определенных точках, что очень важно. Давайте посмотрим на некоторый код.

Предположим, мы хотим загрузить определенное количество URL-адресов одновременно и что-то с ними сделать после того, как все они будут загружены. Следующая функция content загружает содержимое URL-адреса и возвращает его. Если что-то не удается, возвращается ошибка. Реализация была опущена.

func content(url string) (string, error) {...}

Затем у нас может быть набор URL-адресов, с которых мы хотим загрузить контент.

type result struct {
  content string 
  err     error
}

func processURLs(urls []string) error {
  wg := sync.WaitGroup{}
  
  results := make([]result, len(urls))
  for i, url := range urls {
    wg.Add(1)
    go func() {
      defer wg.Done()
  
      c, err := content(url) // no need to cp i, url after Go 1.21
      results[i] = result{content: c, err: err}
    }()
  }
   
  wg.Wait() // all tasks must finish in order to continue from this point on. 
  
  for _, result := range results {
    if result.err != nil {
      return result.err
    }
  }

  for _, result := range results {
    writeContentToFile(result.content)
  }
  
  return nil // no errors and processURLs ended correctly.
 }

В приведенном выше коде мы запускаем горутину, по сути, параллельную задачу, которая будет загружать один URL-адрес. Результат каждой операции, независимо от того, является ли это ошибкой или фактическим содержимым, затем сохраняется в results.

Каждый раз, когда мы запускаем задачу, мы увеличиваем wait group, и каждый раз, когда задача завершается, мы уменьшаем wait group. В то же время wg.Wait() блокируется до тех пор, пока все wait group is 0 не будут выполнены. По сути, wg.Wait() действует как барьер для синхронизации всех задач в этот момент.

Это проще, чем иметь каналы (как в предыдущих сообщениях), а затем отправлять сигналы по этим каналам и синхронизировать их. WaitGroup обеспечивает простоту без использования каналов или других примитивов.

Одно предостережение заключается в том, что WaitGroup не следует копировать, поэтому, если вам нужно где-то его сохранить, он должен использовать указатель на него. Однако подумайте дважды, прежде чем сделать это.

Группа ошибок

Использовать WaitGroup уже легче и проще, чем использовать каналы для такого рода обработки. Однако он не предлагает простого способа управления потенциальной ошибкой во время параллельных вычислений.

В приведенном выше примере мы сами справились с ошибкой, сохранив ее в структуре result, а затем позаботившись о ней во время обработки результатов.

errorgroup.Group, с другой стороны, предлагает способ сделать это, поэтому нам не нужно выполнять дополнительные шаги.

Давайте посмотрим на тот же код, но на этот раз с использованием ErrorGroup.

func processURLs(urls []string) error {
  g := errgroup.Group{}

  results := make([]string, len(urls)) 
  for i, url := range urls {
    g.Go(func() error {
      c, err := content(url) // no need to cp i, url after Go 1.21
      if err == nil {
        results[i] = c
      }
  
      return err
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }

  for _, c := range results {
    writeContentToFile(c)
  }

  return nil 
}

В этой версии processURLs мы видим реализацию, аналогичную той, что была у нас раньше. Однако на этот раз мы используем errorgroup.Group для запуска параллельных задач, и в случае возникновения ошибки errorgroup.Group позаботится об отмене ожидающих задач, а Wait вернет первую возникшую ошибку. Это более чистый и безопасный способ обработки всей обработки без необходимости вручную обрабатывать ошибки или вручную отслеживать запущенные задачи.

Теперь в большинстве случаев errorgroup.Group используется вместе с context.Context, чтобы можно было с благодарностью обрабатывать тайм-ауты, отмены, крайние сроки и т. д.

Давайте улучшим предыдущий код, используя context.Context.

func processURLs(ctx context.Context, urls []string) error {
  g, ctx := errgroup.WithContext(ctx)

  results := make([]string, len(urls)) 
  for i, url := range urls {
    g.Go(func() error {
      c, err := content(url) // no need to cp i, url after Go 1.21
      if err == nil {
        results[i] = c
      }
  
      return err
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }

  for _, c := range results {
    writeContentToFile(c)
  }

  return nil 
}

Обратите внимание, что на этот раз мы передаем context.Context при создании errorgroup.Group. Если родительский context.Context отменяется, задачи errorgroup.Group отменяются, и ошибка, возвращаемая Wait, будет такой же, как ошибка, выдаваемая отменой.

Например, предположим, что мы хотим выполнить предыдущую обработку, но у нас есть только определенное количество времени для ее завершения, скажем, 5 минут. Следующий код показывает, как это будет работать.

func processURLs(ctx context.Context, urls []string) error {
  ctx, cancel := context.WithDeadline(time.Now().Add(5 * time.Minute))
  defer cancel()

  g, ctx := errgroup.WithContext(ctx)

  results := make([]string, len(urls)) 
  for i, url := range urls {
    g.Go(func() error {
      c, err := content(url) // no need to cp i, url after Go 1.21
      if err == nil {
        results[i] = c
      }
  
      return err
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }

  for _, c := range results {
    writeContentToFile(c)
  }

  return nil 
}

Обратите внимание, что мы в основном создаем новый context.Context с определенным сроком, а затем используем его для создания errorgroup.Group. Если крайний срок наступит до того, как все задачи загрузят соответствующий контент, errorgroup.Group отменит задачи, а Wait вернет ошибку, указывающую, что крайний срок достигнут.

Чтобы сделать это с помощью каналов, у нас будет for {} с select, который проверяет выполнение задач и крайний срок с time.After(...). Не так сложно реализовать, но еще труднее понять. Использование errorgroup.Group более естественно, и нам не нужно иметь дело с синхронизацией по каналам, комбинируя это с другими примитивами, что затрудняет структурирование кода и анализ.

Интересный трюк

Представьте, что мы хотим загрузить и обработать URL-адреса в течение срока действия запроса к нашему серверу. Обычно context.Context будет иметь некоторый тайм-аут, определяемый клиентом нашего сервиса, скажем, 5 минут. Теперь нам нужно загрузить и обработать URL-адреса до того, как истечет 5 минут, иначе родительский context.Context истечет время ожидания, что приведет к тайм-ауту всего запроса.

Мы могли бы установить крайний срок, используемый для загрузки URL-адресов, равный крайнему сроку родительского context.Context минус некоторое время, скажем, 30 секунд. Это гарантирует, что мы никогда не превысим родительский крайний срок context.Context.

func processURLs(ctx context.Context, urls []string) error {
  deadline, set := ctx.Deadline()

  if set {
    deadline = deadline.Add(-30 * time.Second)
  }

  ctx, cancel := context.WithDeadline(ctx, deadline)
  defer cancel()
 
  g, ctx := errgroup.WithContext(ctx)

...
<same as before> 

Обратите внимание, что крайний срок установлен на 30 секунд до достижения родительского context.Context.

Счастливого безопасного и производительного кодирования.

Спасибо, что дочитали до конца. Пожалуйста, следите за автором и этой публикацией. Посетите Stackademic, чтобы узнать больше о том, как мы демократизируем бесплатное обучение программированию по всему миру.