Go, разработанный Google, задумывался как язык, использующий преимущества современных компьютеров (и кластеров компьютеров) для более быстрого выполнения параллельного и асинхронного кода с простой семантикой. Однако многие из первоначальных идей и шаблонов были усовершенствованы, упрощены и улучшены.
В разделах Основы параллельных вычислений в Go и Дополнительная информация о каналах Go, параллелизме и параллелизме» мы рассмотрели некоторые основные понятия параллелизма и параллелизма в Go. Однако в этот раз мы рассмотрим более продвинутый, но более простой для написания производительный код Go.
Группа ожидания
Группа ожидания — это асинхронная концепция, которая позволяет нам ждать выполнения определенных условий, часто используемых в качестве барьерного типа синхронизации. Барьеры позволяют нам синхронизировать несколько одновременных операций в определенных точках, что очень важно. Давайте посмотрим на некоторый код.
Предположим, мы хотим загрузить определенное количество URL-адресов одновременно и что-то с ними сделать после того, как все они будут загружены. Следующая функция content
загружает содержимое URL-адреса и возвращает его. Если что-то не удается, возвращается ошибка. Реализация была опущена.
func content(url string) (string, error) {...}
Затем у нас может быть набор URL-адресов, с которых мы хотим загрузить контент.
type result struct { content string err error } func processURLs(urls []string) error { wg := sync.WaitGroup{} results := make([]result, len(urls)) for i, url := range urls { wg.Add(1) go func() { defer wg.Done() c, err := content(url) // no need to cp i, url after Go 1.21 results[i] = result{content: c, err: err} }() } wg.Wait() // all tasks must finish in order to continue from this point on. for _, result := range results { if result.err != nil { return result.err } } for _, result := range results { writeContentToFile(result.content) } return nil // no errors and processURLs ended correctly. }
В приведенном выше коде мы запускаем горутину, по сути, параллельную задачу, которая будет загружать один URL-адрес. Результат каждой операции, независимо от того, является ли это ошибкой или фактическим содержимым, затем сохраняется в results
.
Каждый раз, когда мы запускаем задачу, мы увеличиваем wait group
, и каждый раз, когда задача завершается, мы уменьшаем wait group
. В то же время wg.Wait()
блокируется до тех пор, пока все wait group is 0
не будут выполнены. По сути, wg.Wait()
действует как барьер для синхронизации всех задач в этот момент.
Это проще, чем иметь каналы (как в предыдущих сообщениях), а затем отправлять сигналы по этим каналам и синхронизировать их. WaitGroup
обеспечивает простоту без использования каналов или других примитивов.
Одно предостережение заключается в том, что WaitGroup
не следует копировать, поэтому, если вам нужно где-то его сохранить, он должен использовать указатель на него. Однако подумайте дважды, прежде чем сделать это.
Группа ошибок
Использовать WaitGroup
уже легче и проще, чем использовать каналы для такого рода обработки. Однако он не предлагает простого способа управления потенциальной ошибкой во время параллельных вычислений.
В приведенном выше примере мы сами справились с ошибкой, сохранив ее в структуре result
, а затем позаботившись о ней во время обработки результатов.
errorgroup.Group
, с другой стороны, предлагает способ сделать это, поэтому нам не нужно выполнять дополнительные шаги.
Давайте посмотрим на тот же код, но на этот раз с использованием ErrorGroup
.
func processURLs(urls []string) error { g := errgroup.Group{} results := make([]string, len(urls)) for i, url := range urls { g.Go(func() error { c, err := content(url) // no need to cp i, url after Go 1.21 if err == nil { results[i] = c } return err }) } if err := g.Wait(); err != nil { return err } for _, c := range results { writeContentToFile(c) } return nil }
В этой версии processURLs
мы видим реализацию, аналогичную той, что была у нас раньше. Однако на этот раз мы используем errorgroup.Group
для запуска параллельных задач, и в случае возникновения ошибки errorgroup.Group
позаботится об отмене ожидающих задач, а Wait
вернет первую возникшую ошибку. Это более чистый и безопасный способ обработки всей обработки без необходимости вручную обрабатывать ошибки или вручную отслеживать запущенные задачи.
Теперь в большинстве случаев errorgroup.Group
используется вместе с context.Context
, чтобы можно было с благодарностью обрабатывать тайм-ауты, отмены, крайние сроки и т. д.
Давайте улучшим предыдущий код, используя context.Context.
func processURLs(ctx context.Context, urls []string) error { g, ctx := errgroup.WithContext(ctx) results := make([]string, len(urls)) for i, url := range urls { g.Go(func() error { c, err := content(url) // no need to cp i, url after Go 1.21 if err == nil { results[i] = c } return err }) } if err := g.Wait(); err != nil { return err } for _, c := range results { writeContentToFile(c) } return nil }
Обратите внимание, что на этот раз мы передаем context.Context
при создании errorgroup.Group
. Если родительский context.Context
отменяется, задачи errorgroup.Group
отменяются, и ошибка, возвращаемая Wait
, будет такой же, как ошибка, выдаваемая отменой.
Например, предположим, что мы хотим выполнить предыдущую обработку, но у нас есть только определенное количество времени для ее завершения, скажем, 5 минут. Следующий код показывает, как это будет работать.
func processURLs(ctx context.Context, urls []string) error { ctx, cancel := context.WithDeadline(time.Now().Add(5 * time.Minute)) defer cancel() g, ctx := errgroup.WithContext(ctx) results := make([]string, len(urls)) for i, url := range urls { g.Go(func() error { c, err := content(url) // no need to cp i, url after Go 1.21 if err == nil { results[i] = c } return err }) } if err := g.Wait(); err != nil { return err } for _, c := range results { writeContentToFile(c) } return nil }
Обратите внимание, что мы в основном создаем новый context.Context
с определенным сроком, а затем используем его для создания errorgroup.Group
. Если крайний срок наступит до того, как все задачи загрузят соответствующий контент, errorgroup.Group
отменит задачи, а Wait
вернет ошибку, указывающую, что крайний срок достигнут.
Чтобы сделать это с помощью каналов, у нас будет for {}
с select
, который проверяет выполнение задач и крайний срок с time.After(...)
. Не так сложно реализовать, но еще труднее понять. Использование errorgroup.Group
более естественно, и нам не нужно иметь дело с синхронизацией по каналам, комбинируя это с другими примитивами, что затрудняет структурирование кода и анализ.
Интересный трюк
Представьте, что мы хотим загрузить и обработать URL-адреса в течение срока действия запроса к нашему серверу. Обычно context.Context
будет иметь некоторый тайм-аут, определяемый клиентом нашего сервиса, скажем, 5 минут. Теперь нам нужно загрузить и обработать URL-адреса до того, как истечет 5 минут, иначе родительский context.Context
истечет время ожидания, что приведет к тайм-ауту всего запроса.
Мы могли бы установить крайний срок, используемый для загрузки URL-адресов, равный крайнему сроку родительского context.Context
минус некоторое время, скажем, 30 секунд. Это гарантирует, что мы никогда не превысим родительский крайний срок context.Context
.
func processURLs(ctx context.Context, urls []string) error { deadline, set := ctx.Deadline() if set { deadline = deadline.Add(-30 * time.Second) } ctx, cancel := context.WithDeadline(ctx, deadline) defer cancel() g, ctx := errgroup.WithContext(ctx) ... <same as before>
Обратите внимание, что крайний срок установлен на 30 секунд до достижения родительского context.Context
.
Счастливого безопасного и производительного кодирования.
Спасибо, что дочитали до конца. Пожалуйста, следите за автором и этой публикацией. Посетите Stackademic, чтобы узнать больше о том, как мы демократизируем бесплатное обучение программированию по всему миру.