Похоже, есть спрос на руководство по параллельному конвейеру Rust а-ля https://blog.golang.org/pipelines, так что давайте попробуем.

Пример полного кода на https://github.com/gterzian/pipeline/blob/master/src/main.rs

Примечание. Когда я пишу «поделиться» ниже, я имею в виду это простым английским языком, как «сделать доступным». Фактически, совместное использование полагается на семантику перемещения Rust, а не на общие ссылки.

Общий конвейер

Давайте возьмем приведенный выше пример Go в качестве отправной точки (по крайней мере, простую версию в этой статье), которая включает в себя следующее:

  1. generate числа.
  2. square их, используя несколько рабочих.
  3. merge результаты от различных рабочих.

Некоторые «проблемы» трубопровода

В статье Go дополнительно подчеркивается важная проблема, которую необходимо решить: этапы не завершаются, когда должны, что приводит к утечке ресурсов.

Статья о Go посвящена особенностям разновидности каналов Go (см. Обзор различий в предыдущей статье). Поэтому вместо того, чтобы создавать что-то максимально приближенное к этому примеру, мы сосредоточимся на идиоматическом конвейере Rust, который также имеет перспективу вечного зависания на сцене, но по другим причинам.

Выбранные нами «примитивы» параллелизма

Наш конвейер в Rust будет включать в себя использование некоторых довольно примитивных инструментов, во-первых, что-то, называемое потоками, а также наиболее скучную реализацию канала, стандартную lib std::sync::mpsc.

Эти инструменты кажутся достаточно хорошими, чтобы создавать сложные модели одновременной работы, придерживаясь совместного использования Sender «половины нашего канала различными способами ...

Для более конкретных нужд можно поискать ящики, такие как crossbeam.

Вы хороши настолько, насколько хороши ваши идиомы

Как мы можем написать наиболее «естественную» форму параллельных конвейеров в Rust? Что мы можем определить как «идиоматика» и кто мы такие, чтобы решать, что такое «идиоматика»? Что ж, я соглашусь на следующее:

Идиоматический код неявно использует стандартные функции языка и / или стандартной библиотеки (или ящик, который так широко используется, что тоже считается стандартным). Это «умно», но только потому, что полностью полагается на хорошо известные функции, а не на приспособления, изготовленные на заказ. В случае хитроумных приспособлений, изготовленных по индивидуальному заказу, это будет как можно более «тупым» и явным.

Я собираюсь пойти на большой репутационный риск, пытаясь перечислить несколько хорошо известных функций Rust:

  1. черта Iterator.
  2. for петель.
  3. iter метод mpsc::Receiver, и особенно реализация черты IntoIterator.
  4. Черта Drop.

Можем ли мы реализовать неблокирующий и корректно завершающий параллельный конвейер, полагаясь только на эти концепции? Можно ли считать такой подход к конвейерам «идиоматическим», потому что он опирается на функции стандартного языка (или, скорее, предоставляемые библиотекой)? Давайте посмотрим…

Вечно висящий конвейер в Rust

Давайте начнем с реализации, которая действительно будет зависать вечно, а затем исправим ее.

Сначала познакомимся с нашими компонентами:

Обязательство: создавайте каналы и делитесь ими

Настройка конвейера представлена ​​следующими этапами:

Основной поток создает и совместно использует следующие каналы:

  1. Результаты чан. Отправитель используется на заключительном этапе «слияния», чтобы отправить окончательные результаты обратно в основной поток.
  2. Слить чан. Отправитель делится с «квадратными» рабочими, чтобы отправить квадратные числа на этап «слияния».
  3. Ген чан. Отправитель совместно с этапом «создания» для отправки сгенерированных номеров.
  4. «Квадратные рабочие» - это просто список отправителей, по которому мы можем циклически распределить работу между ними.

Очередь: начни общаться

Общение начинается с:

  1. Получать сгенерированные номера через «ген чан»,
  2. распределите работу, перебирая «square_workers».

Наконец, мы:

  1. Начните получать через «results chan», ожидая окончательных результатов от стадии «merge».
  2. В фоновом режиме у каждого рабочего есть отправитель на этап «слияния», поэтому каждое возведенное в квадрат число отправляется для «слияния» через этот канал, минуя основной поток (см. Фрагмент кода «квадратного» компонента выше).

Это выглядит великолепно, единственная проблема в том, что мы застряли здесь на части «бесконечного лета», поскольку этот трубопровод будет работать вечно ...

Как «сбросить» наши каналы

Как мы можем остановить наш постоянно повторяющийся конвейер? Для этого нам нужно понимать, когда итератор по каналу вернет None.

Примечание об итерации в Rust и каналах: вызов next для итератора, не имеющего никаких значений, вернет None, а также вызовет остановку цикла for.

В случае канала None будет возвращено только тогда, когда все Sender соответствующего Receiver будут отброшены.

Итак, ответ таков: чтобы остановить один этап в нашем конвейере, нам нужно Drop всех отправителей, соответствующих получателю этого этапа.

Итак, вы видите, что наш конвейер естественным образом отключится, когда каждый этап будет выполнен, итеративно по получателю, он отбросит отправителей для следующего этапа, в результате на следующем этапе его итерация будет остановлена, и так далее ...

Ой, на самом деле этого не происходит. Наша очередь рабочих-отправителей остается в области видимости в основном потоке, даже после того, как часть разветвления завершена, следовательно, рабочие будут продолжать итерацию на своих приемниках и никогда не отключатся ...

Престиж: области применения

Давайте исправим указанную выше проблему и введем дополнительную область видимости, в результате чего очередь рабочего-отправителя исчезнет.

Тщательная печать показывает, что действительно, когда все их отправители опустили здесь, когда square_workers выходит за пределы области видимости, рабочие действительно прекращают итерацию.

Однако теперь, кажется, продолжает повторяться merge ...

Как это может быть, если рабочие остановили свою итерацию, а отправитель merge, который они держат, выходит из области видимости, а сам merge должен прекратить повторение, не так ли?

Двигаемся дальше…

Проблема в том, что мы забыли, что каждый из двух воркеров владеет клоном отправителя слияния, а исходный все еще находится в основной области видимости, поэтому он не падает, и merge застревает в повторении навсегда .

Исправление? просто замените один из клонов фактическим ходом merge_chan, например, this:

Теперь, когда merge_chan, отбрасывается, merge прекращает итерацию, в результате чего results_chan отправитель удаляется, в результате чего наш основной поток получает None, и весь наш конвейер корректно завершается.

Конец

К счастью, это была настоящая поездка, ответ на вопрос «как реализовать конвейер в Rust?» кажется таким: «просто перебирайте получателей и обязательно отбрасывайте отправителей, когда вы» покончим с ними ». По сути, когда вышестоящий этап отбрасывает отправителя (-ов) на нижележащий, это приведет к тому, что нижележащий прекратит итерацию на своем получателе и позволит ему отбросить отправителей для следующего этапа, что приведет к постепенному завершению работы.

Конечно, что не распространяется на случай, когда нижележащий этап остановился до того, как получил все значения, отправленные вышестоящим? Но разве нас это волнует? Пока мы гарантируем, что отправители отброшены и весь конвейер отключен, любые данные, оставшиеся в буфере наших неограниченных каналов, тоже будут отброшены ...

И если вам действительно нужен следующий этап, чтобы иметь возможность сигнализировать вышестоящему о прекращении производства значений, могли бы вы реализовать такую ​​дополнительную линию связи, поделившись своим отправителем (ями)?

Продолжение следует…



Rust: параллелизм без сожалений
Rust предлагает обещание« бесстрашного параллелизма
и реализует его за счет безопасности памяти. Но эта безопасность не… medium.com »