Пишите параллельный код

В этой статье мы поговорим о том, как создавать параллельные программы, сочетающие выборку, горутины и каналы в Golang.

Я бы рекомендовал сначала прочитать эти две статьи, чтобы ознакомиться с концепциями параллелизма, каналов и горутин.

Выбирать

Из документации Go Tour:

«Утверждение select позволяет горутине ожидать нескольких операций связи.

select блокируется до тех пор, пока не сможет запуститься один из его кейсов, затем он выполняет этот кейс. Он выбирает один случайным образом, если готовы несколько».

Ответ сервера API

Мы собираемся исследовать, как мы можем использовать select для получения ответа от самого быстрого вызова API. Давайте углубимся в код, чтобы понять select и его мощные функции.

Вышеупомянутая реализация сосредоточена на том, чтобы выделить, как выбор будет ждать, пока не запустится один из его случаев.

В этом примере важно понимать разные части, поэтому давайте рассмотрим их одну за другой.

Прежде чем мы рассмотрим логику выбора, давайте рассмотрим, как выполняются вызовы API.

Структура Function представляет один вызов API, ее атрибуты — это функция f, которая принимает канал типа News, обратите внимание, как сигнатура этой функции уже обеспечивает обработку канала как канала send-only, второй атрибут — это канал типа News. введите News, после выполнения вызова API и анализа ответа этот канал будет использоваться для отправки результатов.

Newsstruct — это объект для хранения статей и их источника.

В строке 43 мы инициализируем срез Function двумя элементами, первый имеет функцию googleNews и использует канал google, а второй использует функцию freeNews и использует канал free.

Поскольку оба вызова API будут получать новости, каналы одного типа, но по одному для каждой функции.

В строках 69 и 102 у нас есть реализации этих двух API. Каждый из них делает HTTP-запрос к своим соответствующим URL-адресам и анализирует ответ, после чего новости отправляются по соответствующим каналам.

Давайте теперь сосредоточимся на методе quickestApiResponse. Цель этого метода — установить переменную article в ответ от самого быстрого API. В строке 54 каждая функция выполняется путем вызова метода Run. Этот метод запускает новую горутину для функции и передает канал. Важно отметить, что эти вызовы API должны выполняться в отдельной горутине, потому что мы не хотим запускать их последовательно.

Затем выбор будет ждать, пока канал google или free отправит ответ. Как только любой из вызовов API отправит ответ по соответствующему каналу, выбор выполнит код в этом случае и проигнорирует другой. Это эффективно настроит статьи на ответ от самого быстрого вызова API.

Давайте запустим программу, чтобы увидеть вывод:

FreeNewsApi бежал быстрее!.

Эта логика может быть применена ко многим другим вариантам использования, позволяя программе запускать несколько горутин, использовать каналы для связи и использовать выбор для их ожидания.

Еще одна вещь, которую мы можем реализовать в этом примере, — это принудительно установить тайм-аут, если вызовы API занимают больше лимита, мы просто оставим статьи пустыми. В приведенном ниже коде это достигается путем добавления еще одного случая к выбору.

time.After возвращает канал типа time.Time, и он отправит текущее время по истечении указанного времени. Обратите внимание, что здесь мы не присваиваем значение этого канала переменной, потому что нас не волнуют данные, которые будет отправлять канал, нас интересует только получение сигнала. Если мы заснем на три секунды в обоих API, мы увидим, что выполняется случай тайм-аута, а два других случая игнорируются.

Контекст с тайм-аутом

Контекст почти всегда присутствует в программе, и мы можем использовать объект контекста, чтобы установить время ожидания для определенных ресурсоемких задач. В следующем примере показано, как использовать context.WithTimeout для ограничения времени, в течение которого программа ожидает ответа от ресурсоемкой задачи.

В приведенном выше примере в качестве родительского контекста используется context.Background(), но в более реалистичных условиях контекст уже будет присутствовать. Контекст с тайм-аутом возвращает контекст, который отправит сигнал через канал ctx.Done по истечении указанного времени.

В строке 13 в отдельной горутине запускается затратная задача, в качестве параметров передаются контекст и сигнальный канал. Дорогостоящая задача приостанавливается на 6 секунд, чтобы имитировать задержку, но время контекста превышает 5 секунд.

Выбор имеет два случая: ожидание запуска ctx.Done или отправки дорогостоящей задачей сигнала, указывающего на ее завершение.

В этом случае запуск этого примера приведет к следующему выводу:

Expensive task took too long to complete

Запуск повторяющихся процессов

Давайте посмотрим, как мы можем использовать select для запуска повторяющегося процесса. Для этой программы у нас будет следующий сценарий:

Программа должна позволить нам передать любую функцию в качестве повторяющегося процесса, когда этот процесс должен начать выполняться, и интервал времени между каждым запуском.

Ниже у нас есть исходный код, давайте посмотрим:

Приведенный выше код отражает задачу, которую мы хотим запустить. У нас есть две основные функции collectNewUsersNotifications и handlePendingUsersNotifications. Первый предназначен для сбора всех новых пользовательских уведомлений, идеальной реализацией было бы то, что эта функция ищет непрочитанные уведомления в базе данных, но для этого примера мы моделируем получение случайных уведомлений для определенных пользователей.

Уведомления создаются с использованием структуры Notification только с двумя полями: одно для содержимого и одно для идентификатора пользователя.

Функция сбора использует тип PendingUserNotifications для хранения уведомлений. Этот тип представляет собой карту, где ключ представляет собой целое число, представляющее идентификатор пользователя, а значение представляет собой часть Notification.

После того, как мы соберем все уведомления, мы хотим использовать функцию handlePendingUserNotifications для перебора уведомлений и запуска функции обработчика для каждого из них. После обработки уведомлений каждого пользователя они удаляются с карты. В этом случае мы будем использовать обработчик sendUserBatchNotificationsEmail. Его цель — отправить пользователю электронное письмо со всеми ожидающими уведомлениями, чтобы он мог их просмотреть.

Теперь давайте сосредоточимся на том, как выполнять эту задачу повторяющимся образом, используя select. Как я упоминал ранее, мы должны учитывать следующее:

  • Разрешить прохождение интервала времени
  • Разрешить передачу времени начала процесса
  • Разрешить вызывающему абоненту отменить/остановить повторяющийся процесс, когда он захочет

В приведенном ниже коде показано, как этого добиться:

Мы ввели новую структуру для представления повторяющегося процесса RecurringProcess. Эта структура содержит следующие поля:

  • name — Имя процесса
  • interval — Интервал времени между каждым запуском
  • startTime — Время начала процесса
  • handler — Функция-обработчик для вызова при каждом запуске
  • stop — Канал для остановки процесса

В функции pendingNotificationsProcess мы инициализируем новый повторяющийся процесс и уведомления в строках 30 и 31 соответственно. Функция-обработчик, которую мы будем использовать, содержит функции collectNewUsersNotifications и handlePendingUsersNotifications внутри. Обратите внимание, что мы передаем процесс handlePendingUsersNotifications, потому что это необходимо для остановки процесса.

Мы также указали интервал и время начала.

Затем мы вызываем createRecurringProcess, эта функция создает повторяющийся процесс и также запускает его. Давайте сосредоточимся на строке 88, где мы используем горутину для запуска процесса.

В строке 40 мы блокируем основную горутину, читая из канала остановки, что означает, что основная горутина будет заблокирована до тех пор, пока сообщение не будет отправлено в этот канал.

Давайте взглянем на функцию Start в строке 93, которая содержит всю логику для запуска повторяющегося процесса.

Эта функция использует переменную startTicker для запуска повторяющегося процесса с использованием времени начала. Если время начала находится в прошлом, процесс начнется немедленно.

time.NewTimer отправит текущее время на свой канал, когда указанная продолжительность пройдет, и это позволит нам начать процесс. Вот почему у нас есть первый случай выбора, ожидающего получения сигнала каналом.

У нас также есть в строке 95 переменная ticker, которая является time.Ticker. Тикер в ходу будет отправлять тики по своему каналу с заданным интервалом. Как только канал startTicker.C отправит сигнал, мы назначаем новый тикер с интервалом переменной ticker в строке 106, а также вызываем функцию-обработчик.

После этого ticker начнет получать тики во втором случае выбора, и каждый раз, когда он их получает, также будет вызываться функция-обработчик.

В последнем случае выбора мы ждем, пока не будет отправлен сигнал, чтобы остановить процесс, просто вернувшись.

Обратите внимание, что select находится внутри бесконечного цикла for. Это потому, что мы хотим продолжать цикл до тех пор, пока один из случаев явно не прервет цикл. Каждый раз, когда мы получаем тик, будет выполняться второй случай, а затем он снова войдет в тот же цикл, где выбор снова будет ждать выполнения некоторых своих случаев.

Чтобы остановить процесс, мы добавили некоторую логику в строку 55, мы считаем количество уведомлений, и если в какой-либо момент не было ожидающих уведомлений, программа отменяет процесс. Функция Cancel закрывает стоп-канал, и на этом программа завершается.

Давайте запустим программу, чтобы увидеть, как она работает:

Отлично, программа работает как положено. Это всего лишь пример того, как запустить повторяющийся процесс. Это может быть базовый код для реализации чего-то более сложного. Вы можете создавать сложные программы с помощью select.

Заключение

Поначалу создание параллельных программ может быть сложной задачей, особенно если вы изо всех сил пытаетесь понять, как работают горутины, каналы и выбор.

Я надеюсь, что с этой статьей вы почувствуете себя менее запутанными, и вы нашли несколько вариантов использования, где вы можете использовать select.

Спасибо, что прочитали, и следите за новостями.