В последнее время я работал над масштабированием алгоритма архитектуры Master-Slave и, учитывая количество шаблонов, которые я использую, я решил написать серию сообщений в блоге о параллелизме Ruby и его шаблонах.

В стандартную библиотеку Ruby встроены примитивы параллелизма. Сегодня мы начнем с основных блоков, а позже перейдем к более сложным паттернам. Стандартная библиотека Ruby thread содержит объекты Thread и Queue, а также многие другие, которые мы увидим в других сообщениях.

Сегодня мы начнем с простого дизайна, в котором производитель ставит в очередь задания для обработки потребителем. Производитель может быть веб-сервером, ставящим в очередь уведомления по электронной почте для отправки, основным потоком в сканере, который ставит в очередь URL-адреса для компонентов загрузчика… список может быть длинным.

Первый инструмент, который нам понадобится, это очередь. Очередь является потокобезопасной, что означает, что мы можем создавать множество шаблонов параллелизма только с ней.

Мы будем использовать эту очередь для связи между производителем и потребителем.

Затем мы можем конкретизировать базового производителя, который будет работать в цикле, создавать некоторые задания и помещать их в очередь, которую мы только что создали.

Мы реализуем производителя как поток, чтобы мы могли переместить управление программой на следующий блок.

Queue имеет множество псевдонимов для вставки и удаления элементов: здесь мы просто использовали << для вставки, но мы могли бы использовать enq или push. Точно так же для удаления элементов мы можем использовать shift, deq или pop. На момент написания этого поста я лично предпочитал оператор << для вставки сообщений, поскольку он графически выразительнее, а затем либо pop, либо deq для извлечения из очереди. На мой взгляд, enq и deq очень похожи внешне, чтобы использоваться вместе (обычно я предпочитаю использовать разные термины, чтобы усилить контраст), а push и pop немного вводят в заблуждение, так как часто используются для поведения стека. .

Продолжим наше путешествие в сторону потребителя. Теперь нам нужно реализовать потребителя, который будет извлекать следующее задание из очереди и выполнять с ним некоторую работу.

Производитель и потребитель не зависят друг от друга. Мы могли бы полностью изменить дизайн потребителя, не затрагивая производителя, и наоборот. Это потому, что мы установили протокол связи между двумя компонентами.

Если бы мы запустили нашу программу прямо сейчас, мы бы увидели, что она сразу же завершается, несмотря на то, что мы определили два потока. Это потому, что мы не сказали основному потоку ждать завершения других потоков. Это похоже на запуск команды Unix с & в конце, которая запустит команду в фоновом режиме. Мы исправим это, запустив метод Thread#join для каждого созданного потока.

Запустив нашу программу, мы увидим, что все работает правильно, за исключением того факта, что процесс никогда не заканчивается — этим мы займемся позже.

Если вы заметили ранее, я намеренно добавил sleep 1 для производителя, чтобы имитировать некоторую работу на его стороне, и sleep 2 для потребителя, чтобы имитировать длительную работу. Короче говоря, я сделал потребителя медленнее производителя. Наблюдая за выводом, мы видим, что к тому времени, когда мы обработаем задание № 6, производитель уже поставил в очередь 10 заданий.

Если производитель будет постоянно создавать рабочие места такими темпами, потребитель никогда не сможет его догнать. В производственном приложении это было бы неприемлемо. Но нас заботит производительность, поэтому мы решили развернуть еще одного потребителя, который извлекает задания из той же очереди, чтобы оба потребителя могли не отставать от производителя.

Мы немного модифицируем потребительский блок, чтобы он стал массивом потоков. Приятным побочным эффектом является то, что у нас также есть переменная, которой мы можем управлять, чтобы увеличивать/уменьшать количество потребителей.

Теперь, запустив скрипт, мы заметим, что производитель и потребитель работают с одинаковой скоростью, потому что один потребитель получает новое задание, а другой все еще обрабатывает предыдущее задание.

Стратегия отступления

На данный момент наша программа будет работать вечно, и это нецелесообразно тестировать. Если бы мы писали тесты для этой программы, у нас, вероятно, был бы производитель, ставящий в очередь конечное число заданий, а затем мы утверждали бы, что их обрабатывает потребитель.

Давайте изменим производителя, чтобы поставить в очередь только конечное число заданий.

Если мы запустим программу, мы увидим, что потребители загружают задания, но затем возникает необычная ошибка:

О чем эта ошибка?

Среда выполнения Ruby поняла, что есть 2 других потока, готовых извлечь из очереди (и находящихся в состоянии ожидания), но ни один другой поток не поставит в очередь новые задания, потому что поток-потребитель завершил работу после постановки в очередь 5 заданий.

Что нам нужно, так это механизм, сообщающий потребителям, что дальнейшие задания не будут ставиться в очередь и что они могут безопасно выйти.

Мы можем поставить в очередь символ :done в качестве сигнала для потребителя, который его прочитает. Поскольку у нас есть 2 потребителя, мы делаем это дважды.

На стороне потребителя мы должны проверить, является ли задание особым случаем :done, и, если это так, прервать цикл.

С этим небольшим изменением наш скрипт теперь может корректно завершаться, потому что больше нет зависающих потоков.

EDIT: Спасибо Jeremy за указание на то, что начиная с Ruby 2.3.0 мы теперь можем использовать Queue#close в качестве стратегии выхода.

Независимо от того, есть ли у нас производитель, который ставит в очередь конечное число сообщений или работает в цикле и создает сообщения вечно, наличие стратегии выхода для потребителей всегда является хорошей практикой. Например, мы могли бы перехватить сигнал Ctrl-C и вместо немедленного выхода мы могли бы уведомить потребителей о том, что мы закрываем очередь и больше не будет добавлено никакой работы, а затем выполнить демонтаж или очистку для корректного выхода.

Первоначально опубликовано на hspazio.github.io.