Если вы пришли с Java, вы, вероятно, ассоциируете асинхронность с потоками. А это значит, что вы имели дело с общим изменяемым состоянием. Вы потратили бесчисленные часы на поиск тупиков и условий гонки. Вы проявили осторожность при изменении общего состояния с помощью примитивов блокировки, таких как synchronized. Возможно, вы даже использовали функции блокировки более высокого порядка, такие как Semaphores и Latches.

На этом этапе вы можете видеть, что параллелизм - это сложная задача. Это действительно трудно. Когда вы берете однопоточный фрагмент кода и делаете его параллельным, вы по своей сути вносите огромную сложность.

Если вы использовали Kotlin, вы, вероятно, слышали о сопрограммах. Сопрограммы не новы. Они существуют с 60-х / 70-х годов. Они являются частью другой модели параллелизма, известной как коммуникационные последовательные процессы (CSP) ». Модель параллелизма Kotlin состоит из двух примитивов: Coroutines и Channels. Эти примитивы параллелизма упрощают рассуждения о параллелизме и повышают тестируемость. И ИМО, они также являются одной из самых захватывающих частей изучения языка.

Эта статья знакомит вас с концепцией сопрограмм и каналов с помощью иллюстраций.

ПРИМЕЧАНИЕ. На момент написания статьи каналы находятся в экспериментальной стадии.

Однопоточная кофейня

Я воспользуюсь аналогией с заказом капучино в кафе, чтобы объяснить сопрограммы и каналы. Начнем с того, что один бариста обслуживает заказы.
Бариста:

  1. Принимает заказ
  2. Перемалывает кофейные зерна (30 секунд… это очень медленная кофемолка)
  3. Делает рюмку эспрессо (20 секунд)
  4. Выпаривает молоко (10 секунд)
  5. Сочетает приготовленное на пару молоко с рюмкой эспрессо (5 секунд… для необычного латте-арта)
  6. Подает капучино

Это похоже на однопоточное приложение - один бариста выполняет всю работу последовательно. Мы можем моделировать каждый шаг как функцию в программе. А поскольку это однопоточная программа, текущая функция должна завершиться перед переходом к следующей функции. Вот как выглядит программа (попробуй):

Результат этого выглядит так:

Концептуально эта программа очень проста. Об этом легко рассуждать и понимать. Но это неэффективно. Что происходит, когда кофейня становится популярной и мы нанимаем еще двух сотрудников. Как наша текущая концептуальная модель позволяет трем сотрудникам работать вместе, но независимо? Как мы масштабируем нашу программу. Как программа может использовать преимущества нескольких потоков для приготовления капучино. Параллелизм становится важной частью решения.

Параллельная кофейня

Предположим, по аналогии с кофейней, мы наняли еще одного бариста и кассира. А пока предположим, что у нас есть две кофемолки, а наша эспрессо-машина может сделать две порции одновременно. Сделаем еще одну оптимизацию. Бариста может сделать рюмку эспрессо и одновременно приготовить молоко. Когда оба будут выполнены, бариста может объединить их вместе.

Вот что у нас сейчас есть:

  1. Кассир принимает новый заказ. Кассир ждет, пока один из бариста не начнет обрабатывать этот заказ, прежде чем принимать новые заказы.
  2. Доступный бариста примет заказ и перемолёт кофейные зерна (30 секунд).
  3. Поместите молотые кофейные зерна в кофемашину и сделайте рюмку эспрессо (20 секунд).
  4. Пока готовится эспрессо, бариста готовит молоко на пару (10 секунд).
  5. Когда порция эспрессо и приготовленное на пару молоко будут готовы, бариста объединит их для приготовления капучино (5 секунд).
  6. Подавать капучино

Теперь у нас есть система, которая намного эффективнее. Есть несколько частей программы, которые могут выполняться одновременно.

Но для поддержки такого поведения нам понадобится способ создать кассира и двух бариста, которые могли бы работать независимо. Нам нужно, чтобы бариста поговорили с кассиром. И нам понадобится способ, чтобы бариста могли управлять кофемашиной эспрессо (думайте об эспрессо-машине как об общем ресурсе), не вступая в конфликт друг с другом.

Потоки и сопрограммы

В нашем примере выше мы можем представить бариста и кассира как сопрограммы. Но что такое сопрограммы. Чтобы ответить на этот вопрос, давайте сначала обновим, что такое потоки.

Параллелизм - это не параллелизм

Потоки позволяют выполнять единицы работы одновременно. Каждый поток Java выделяется в пространстве пользователя, но отображается в поток ядра. Это означает, что операционная система управляет этими потоками. Операционная система планирует отрезок времени для запуска каждого потока. Если поток не выполняется в этом окне, операционная система прерывает поток и переключается на другой поток. Это известно как упреждающее планирование. На одноядерном процессоре одновременно может выполняться только один поток. Быстрое упреждающее планирование потоков операционной системой - это то, что позволяет независимым единицам работы выполняться одновременно. Сейчас большинство современных телефонов имеют многоядерные процессоры. Это допускает параллелизм. Поток A может выполняться на ядре ЦП 1, в то время как поток B может выполняться на ядре ЦП 2. Необходимо сделать важное различие - параллелизм - это не параллелизм. И последнее, что касается ниток, это то, что они дорогие. На JVM вы можете ожидать, что каждый поток будет иметь размер около 1 МБ. У каждого потока есть свой стек. И, наконец, стоимость планирования потоков, переключения контекста и недействительности кэша ЦП.

Концептуально сопрограммы похожи на потоки. Они выполняют единицы работы одновременно. Но в отличие от потоков, сопрограммы не обязательно привязаны к какому-либо конкретному потоку. Сопрограмма может начать выполнение в одном потоке, приостановить выполнение и возобновить выполнение в другом потоке. Операционная система не управляет сопрограммами. На уровне пользовательского пространства они управляются средой выполнения Kotlin. Это означает, что сопрограмме не выделяется отрезок времени для выполнения единицы работы. Это также означает отсутствие накладных расходов на планировщик. Вместо этого сопрограммы выполняют совместную многозадачность. Когда одна сопрограмма достигает точки приостановки, Kotlin Runtime найдет другую сопрограмму для возобновления. Вы можете думать об этом как о мультиплексировании нескольких сопрограмм в одном потоке. У сопрограмм небольшой объем памяти - несколько десятков байтов. Это дает вам очень высокий уровень параллелизма с очень небольшими накладными расходами.

Давайте обновим наш первый пример, чтобы две сопрограммы обрабатывали список заказов одновременно (попробуйте).

Запустить новую сопрограмму так же просто, как вызвать launch. В приведенном выше примере две сопрограммы начинаются с main. Сопрограммы должны быть связаны с областью сопрограмм. Это гарантирует, что сопрограммы будут очищены без необходимости явно управлять жизненным циклом сопрограммы. В нашем примере мы запускаем сопрограммы внутри области, определенной runBlocking. Это означает, что функция main не завершится, пока не завершатся две дочерние сопрограммы (barista-1 и barista-2).

Совет. Если вы пробуете это на Android, ознакомьтесь с Структурированным параллелизмом и CoroutineScope.

Второе, что мы сделали, - это добавили модификатор suspend к функции makeCoffee. Если вы проследите каждый метод внутри этой функции, вы заметите, что у них также есть модификатор suspend, примененный к их объявлениям функций. И если вы посмотрите на эти функции, то заметите, что все они вызывают delay вместо Thread.sleep. Функция delay переводит сопрограмму в состояние приостановки на некоторый период времени, не блокируя поток, в котором она выполняется. Это означает, что среда выполнения Kotlin может найти другую сопрограмму для возобновления работы в этом потоке. Эта концепция позволяет сопрограммам использовать потоки с высокой степенью эффективности. То, чего никогда не сможет достичь планировщик потоков операционной системы.

Совет. Попробуйте удалить suspend ключевое слово. Вы получите полезное сообщение об ошибке 😉.

Результат этого выглядит так:

Отлично! Теперь у нас есть два бариста, которые одновременно готовят кофе. И они оба работают в одном потоке! Но как заставить двух бариста разговаривать друг с другом? Вот тут-то и нужны каналы.

Совет: указав диспетчер, вы можете изменить пул потоков, в котором назначена выполнение сопрограммы:

launch(Dispatchers.Default + CoroutineName(“barista-1”)) {
    makeCoffee(ordersChannel)
}

каналы

Вы можете думать о канале как о трубе между двумя сопрограммами. Этот канал облегчает передачу информации между этими двумя сопрограммами. Одна сопрограмма может отправлять информацию по этому каналу. Другая сопрограмма будет ждать получения информации. Эта концепция связи между сопрограммами отличается от потоков.

Не общайтесь, разделяя память; вместо этого поделитесь памятью, общаясь.

Когда вы пишете программное обеспечение, которое включает координацию с блокирующими ресурсами (сеть, база данных, кеш и т. Д.) Или ресурсоемкие операции, вы перекладываете это на потоки. Итак, теперь у вас есть вещи, которые разделяются между потоками. Чтобы безопасно поделиться чем-либо, вы полагаетесь на блокировку ресурса или памяти, чтобы два потока не могли читать или писать в них одновременно. Обычно потоки обмениваются данными - через разделяемую память. Но это также вызывает всевозможные проблемы, такие как условия гонки, взаимоблокировки и т. Д.

Каналы продвигают другой взгляд на общение: не общайтесь, делясь воспоминаниями, делитесь общением.

Давайте обновим наш пример, чтобы использовать канал для обмена информацией об обработке заказов между кассиром и двумя бариста (попробуйте).

Прежде чем мы углубимся в код, давайте займемся концептуальным пониманием изменений. Ниже представлена ​​визуализация того, что делает приведенный выше код.

Существуют три сопрограммы (кассир, бариста 1 и бариста 2), работающие независимо и выполняющие определенные единицы работы. Кассир связывается с двумя бариста через канал. Кассир принимает заказ, размещает его на канале и ждет, пока один из двух бариста примет заказ. Так сопрограммы синхронизируются друг с другом. Как только бариста закончит готовить кофе, он синхронизируется с кассиром для обработки следующего заказа. Что делать, если заказов еще нет? Два бариста приостановят выполнение и будут ждать, пока на канал поступит приказ.

Первое, что нам нужно сделать, это создать канал:

val ordersChannel = Channel<Menu>()

Далее нам нужно отправить заказы по этому каналу. Отправка по каналу - это приостанавливаемая операция, и ее нужно вызывать из сопрограммы. Для этого мы создаем новую сопрограмму Cashier.

launch { // launches the cashier coroutine
    for (o in orders) { ordersChannel.send(o) }
    ordersChannel.close() // don't forget to close channels when you're done with them!
}

Теперь у кассира есть возможность безопасно отправлять заказы на приготовление кофе. Следующее, что нам нужно сделать, это обновить логику для двух бариста, которые будут использовать этот канал.

Функция makeCoffee теперь принимает канал вместо списка. Функция будет перебирать канал, как и в случае со списком. Когда на канале ничего нет, функция приостанавливает выполнение.

private suspend fun makeCoffee(ordersChannel: Channel<Menu>) {
    ...
}

Закрытие канала похоже на конечное событие. Это сигнализирует функциям, считывающим из этого канала, что больше нечего обрабатывать. Это завершает цикл внутри makeCoffee и позволяет завершить работу сопрограммы. Что, если мы никогда не закроем канал? Это означает, что обе сопрограммы Barista будут в неопределенном состоянии приостановки (ожидая, что что-то поступит на канал). И поскольку две сопрограммы принадлежат области runBlocking, функция main никогда не завершится.

Вот как выглядит результат:

Обратите внимание, как оба бариста одновременно обрабатывают разные заказы. Также обратите внимание, что две сопрограммы выполняются в одном потоке (основном потоке). Отлично! Теперь у нас есть возможность для двух наших бариста одновременно обрабатывать заказы и общаться с кассиром.

Свойства каналов

Каналы образуют фундаментальный компонент для связи между сопрограммами. Channel реализует интерфейс SendChannel и ReceiveChannel. Давайте посмотрим на некоторые свойства каналов.

Приостановка выполнения при отправке или получении

Отправка по каналу или получение из канала может приостановить выполнение. Мы видели выше, когда кассир отправляет заказ по каналу, сопрограмма приостанавливает выполнение до тех пор, пока другая сопрограмма не сможет получить из этого канала. Точно так же мы видели, что бариста приостанавливает выполнение при получении из канала до тех пор, пока на канале не станет доступен заказ.

Типы канальных буферов

Каналы предлагают гибкость с точки зрения передачи сообщений между сопрограммами.

Свидание (без буферизации)
Это тип буфера канала по умолчанию. У него нет буфера. Вот почему сопрограмма приостанавливается до тех пор, пока и получающая, и отправляющая сопрограммы не соберутся вместе для передачи данных.
val channel = Channel<Menu>(capacity = Channel.RENDEZVOUS)

Conflated
Это создает канал с фиксированным размером буфера 1. Если сопрограмма-получатель не успевает за производителем, производитель перезаписывает последний элемент в буфере. Когда принимающая сопрограмма готова принять следующее значение, она получает последнее значение, отправленное сопрограммой-производителем. Это также означает, что сопрограмма производителя не приостанавливает выполнение при отправке на канал. Принимающая сопрограмма по-прежнему будет приостанавливать выполнение, пока что-то не станет доступным на канале.
val channel = Channel<Menu>(capacity = Channel.CONFLATED)

Буферизованный
В этом режиме создается канал с буфером фиксированного размера. Буфер поддерживается массивом. Производящая сопрограмма будет приостановлена ​​при отправке, если буфер заполнен. Принимающая сопрограмма будет приостановлена, если буфер пуст.
val channel = Channel<Menu>(capacity = 10)

Без ограничений
В этом режиме канал создается с неограниченным буфером. Буфер поддерживается LinkedList. Предметы, произведенные слишком быстро, без ограничений буферизируются. Если буфер не опустошен, элементы продолжают накапливаться до тех пор, пока память не будет исчерпана. Это приводит к OutOfMemoryException. Производящая сопрограмма никогда не приостанавливает отправку на канал. Но если буфер пуст, принимающая сопрограмма будет приостановлена.
val channel = Channel<Menu>(capacity = Channel.UNLIMITED)

Строители сопрограмм

Нам пришлось связать канал с сопрограммой, чтобы отправлять или получать от них. Но мы можем использовать конструкторы сопрограмм, чтобы упростить создание сопрограммы и канала.

Произвести
Будет создан новый ReceiveChannel. Внутри ProducerScope он запускает сопрограмму для отправки значений по каналу. Когда нечего отправлять, канал неявно закрывается, а ресурс сопрограммы освобождается.
Мы можем упростить создание нашего orderChannel в приведенном выше примере, чтобы он выглядел так:

Актер
Подобно продюсеру, создается новый SendChannel. Внутри он запускает сопрограмму в ActorScope, используемую для получения значений по каналу. Вы можете думать об этом как о раскрытии почтового ящика и внутренней обработке этих элементов в контексте сопрограммы Actor.

Совет. В отличие от produce построителя сопрограмм, вам нужно будет явно останавливать актера, когда он больше не нужен. Обязательно позвоните actor.close()

Обратное давление

Если вы пришли из мира RxJava, то вы, вероятно, знакомы с концепцией и важностью противодавления. Что замечательно в каналах, так это то, что в них встроено противодавление. Противодавление распространяется вверх по потоку на основе различных используемых режимов буферизации каналов.

Добавление эспрессо-машины

Наша реализация кофейни в настоящее время поддерживает двух бариста, готовящих кофе. Но концептуально это похоже на использование двух экземпляров кофемашины эспрессо. Как создать кофемашину для приготовления эспрессо, которую смогут использовать два бариста?

Бариста в настоящее время также выполняют каждый шаг последовательно. Есть возможность это оптимизировать. Как мы можем изменить нашу программу, чтобы бариста могли готовить молоко на пару, делая порцию эспрессо?

Наша кофемашина эспрессо имеет две паровые трубки и два портафильтра. На входе в паровую трубку подается молоко, а на выходе - молоко, приготовленное на пару. На входе в портафильтр поступают молотые кофейные зерна, а на выходе - порция эспрессо.

Один из подходов состоит в том, чтобы смоделировать эти две функции как приостановленные. Таким образом, вызывающий абонент предоставит входные данные (тип молока или тип молотых кофейных зерен) и будет ожидать выхода (приготовленное на пару молоко или порция эспрессо). Но как насчет внутреннего устройства кофемашины эспрессо? Как кофемашина ограничивает количество порций, приготовленных одновременно?

Что, если бы мы построили два канала, один для портафильтра и один для паровой трубки, с фиксированным размером буфера 2. Затем мы могли бы отправить входной сигнал в соответствующий канал. Использование канала - хороший способ общения. Но установка размера буфера на 2 не означает, что мы делаем две порции эспрессо одновременно. Скорее, это означает, что у нас может быть до двух ожидающих запросов на порцию эспрессо при получении одного.

Что, если мы создадим канал для каждого портафильтра. И аналогично создаем канал для каждой паровой трубки. Это ближе к тому, что мы хотим. Мы можем запустить сопрограмму для каждого портафильтра и связать каждый портфильтр с каналом. Это хороший кандидат в актера.

Теперь нам просто нужен способ выбрать портафильтр для отправки. А если используются оба портафильтра, мы должны приостановить работу до тех пор, пока один из них не станет доступным. То же самое и с двумя трубками пара.

Концептуально это то, что мы пытаемся сделать:

Мы хотим, чтобы первая сопрограмма отправляла «синие» данные либо сопрограмме 2, либо сопрограмме 3, которая когда-либо становится доступной первой.

Выбор каналов

Нам нужен способ выбрать канал для отправки (или получения). Kotlin предоставляет именно это с помощью выражения select. Он работает очень похоже на инструкцию switch, но для каналов. select выбирает первый готовый канал. Под готовностью мы подразумеваем, что это может быть первый канал, который готов к отправке или готов к получению. Выражение select приостанавливается, если ни один из каналов не готов.

Вот как теперь выглядит обновленный код кофемашины:

Поскольку обе функции практически одинаковы, мы сосредоточимся на pullEspressoShot. Функция выбирает два канала портафильтра для отправки. Но нам нужен способ передать результат от актора портафильтра обратно в оператор select. Поэтому мы создаем канал и передаем его вместе с запросом в портафильтр. Как только запрос отправлен в канал, мы ждем ответа и доставляем результат. Реализация портафильтра отправляет результат по предоставленному каналу, а затем закрывает канал.

Но мы еще не закончили. Мы создали четыре канала, два для паропровода и два для портафильтров. Эти каналы также связаны с сопрограммами. Мы создали их как актеров. Значит, мы должны закрыть актеров. Мы вводим для этого функцию shutdown.

Взгляните на комплектную кофемашину здесь.

Полная картина

Теперь у нас есть способ разделить Эспрессо-машину между сопрограммами. Давайте обновим программу, чтобы воспользоваться этим. Мы также можем сделать рюмку эспрессо и одновременно приготовить молоко на пару (попробуй).

Мы создаем экземпляр кофемашины Espresso Machine и передаем его функции makeCoffee. Внутри функции makeCoffee мы запрашиваем порцию эспрессо и приготовленное на пару молоко из кофемашины. Но мы хотим сделать и то, и другое асинхронно. Обернув вызов в блок async, мы запускаем сопрограмму и получаем Deferred. Мы можем вызвать await на Deferred, чтобы получить фактическое значение.

Целью этой статьи было объяснить основы работы с каналами и сопрограммами. Но есть важная информация как о каналах, так и о сопрограммах, которые могут вызвать у вас проблемы. Каналы и сопрограммы - не серебряная пуля, позволяющая избежать знакомых проблем параллелизма. По-прежнему возможны тупиковые ситуации, утечки памяти и гонки данных. Но эти концепции, безусловно, упрощают рассуждения о параллелизме.

Мы рассмотрели основы сопрограмм и каналов. Мы взяли простую последовательную программу и превратили ее в параллельную. Мы рассмотрели несколько различных шаблонов обмена данными и взаимодействия между сопрограммами.

Кредиты

Спасибо Хоакиму Вергесу за просмотр этого сообщения и прекрасный отзыв.

дальнейшее чтение

Структурированный параллелизм от Романа Елизарова
Отличный пост, объясняющий важность структурированного параллелизма с сопрограммами.

Документы Kotlin: каналы
Документы Kotlin описывают несколько способов использования каналов.

Тупики в неиерархической CSP от Романа Елизарова
Каналы и сопрограммы - отличный способ рассуждать о параллелизме, но они не предотвращают взаимоблокировки. Это отличный пост, который проведет вас через настоящую проблему и подводные камни.

GopherCon 2018: переосмысление классических шаблонов параллелизма Брайана С. Миллса
Доклад о шаблонах параллелизма с использованием примитивов параллелизма go (горутины и каналы).