После некоторых экспериментов с горутинами и сопрограммами Kotlin я искал реализацию сопрограмм Java. Но решения, которые я смог найти, не являются чистой Java, потому что они либо используют библиотеки, отличные от Java, через JNI, либо полагаются на специальные инструменты, такие как манипулирование байт-кодом специальными Java-агентами, которые необходимо запускать параллельно с приложением.

Возникает вопрос: можно ли реализовать сопрограммы на чистой Java? Для ответа полезно взглянуть на два основных свойства сопрограмм:

  1. Сопрограммы часто называют «облегченными» потоками, то есть они не работают в собственном выделенном системном потоке, а вместо этого совместно используют существующие потоки - отсюда co -программы. Кооперативно означает, что разработчик сопрограмм несет ответственность за то, чтобы они не занимали общий поток больше, чем необходимо. Явная поддержка выполнения совместно используемых потоков была добавлена ​​в Java 7 с помощью ForkJoinTask и улучшена в Java 8 с помощью CompletableFuture.
  2. Сопрограммы могут быть приостановлены на неопределенное время, например при ожидании ввода-вывода или ожидания других сопрограмм. Во время приостановки сопрограммы вообще не занимают никаких вычислительных ресурсов и возобновляют выполнение только тогда, когда ожидаемый ресурс становится доступным. Приостановка - это часть, для которой языки с функциями сопрограмм имеют специальные ключевые слова и поддержку компилятора и где некоторые из существующих библиотек изменяют байт-код исполняемой программы. Но возможно ли на чистой Java без таких ухищрений?

Краткий ответ на этот вопрос: да. В этой статье представлена ​​чистая реализация сопрограмм на Java, доступная как Open Source on GitHub по лицензии Apache 2.0. Он использует функции, доступные начиная с Java 8, чтобы сделать объявление и выполнение сопрограмм как можно проще.

Текущая версия 0.9.0 означает, что код все еще считается экспериментальным и еще не использовался в производственной среде. Следует ожидать, что некоторые ошибки все еще будут. Любые отзывы об API и функциональных возможностях приветствуются и помогут скоро выпустить версию 1.0. Библиотека имеет только одну зависимость от другого нашего проекта, который обеспечивает общие базовые функции.

Часть выполнения сопрограмм в этой структуре следует модели структурированного параллелизма и во многом вдохновлена ​​заслуживающим внимания эссе Примечания о структурированном параллелизме, или: инструкция Go, признанная вредной Натаниэль Дж. Смит. Я настоятельно рекомендую это всем, кто участвует в программировании параллельного кода и / или хочет использовать этот фреймворк. Это также повлияло на последнюю версию сопрограмм Kotlin. Вкратце, в нем говорится, что параллельный код, который выполняется сам по себе без какого-либо контроля со стороны кода приложения, является проблематичным, и его следует избегать.

Объявление сопрограмм

Поскольку в Java явно отсутствуют собственные механизмы для объявления приостановок, единственный способ реализовать приостановку сопрограмм - использовать API. К счастью, функции функционального программирования, представленные в Java 8, в первую очередь лямбда-выражения и ссылки на методы, обеспечивают краткий синтаксис для объявления фрагментов исполняемого кода. Вместе с возможностью статического импорта представленный фреймворк позволяет простое объявление сопрограмм.

Сопрограммы определены как экземпляры класса Coroutine. Новая сопрограмма создается либо путем вызова конструктора, либо путем вызова статического фабричного метода Coroutine.first(). Последний рекомендуется, потому что он является частью свободного интерфейса для объявления сопрограмм. Его дополнением является метод экземпляра Coroutine.then(), который расширяет сопрограмму дополнительными функциями.

Эти методы ожидают в качестве аргумента экземпляр абстрактного класса CoroutineStep. Фреймворк уже содержит несколько поэтапных реализаций, которые охватывают наиболее распространенные применения. Самый общий шаг - CodeExecution. Как и все этапы фреймворка, он предоставляет статические фабричные методы, которые дополняют свободный интерфейс сопрограмм. Вызывая эти методы с лямбда-выражениями или ссылками на методы, сопрограмма может быть расширена произвольным кодом. Вот простой пример такого объявления:

Это показывает, что сопрограммы обычно имеют тип ввода и вывода, аналогичный интерфейсу Java Function. Следовательно, сопрограммы могут быть вызваны с определенным входным значением для обработки и могут возвращать результат вычисления.

Важной особенностью сопрограмм является их неизменность. После определения их последовательность шагов больше не может быть изменена. Метод экземпляра then всегда возвращает новый экземпляр сопрограммы, содержащий дополнительный шаг. Это позволяет создавать шаблоны сопрограмм, которые можно использовать в качестве строительных блоков для новых сопрограмм, либо в качестве отправной точки для добавленной функциональности, либо используя их в качестве подпрограмм. Из-за их неизменности их также можно использовать в объявлениях констант.

Сопрограмма может иметь произвольное количество шагов. Чем больше шагов, тем больше точек приостановки у сопрограммы. Каждая из этих точек на короткое время прерывает выполнение, давая возможность другому параллельному коду запуститься. С другой стороны, у каждой приостановки есть некоторые накладные расходы на управление. Необходимо определить, какая степень детализации является правильной для конкретного решения.

Фреймворк включает несколько других шагов сопрограмм, которые обеспечивают дополнительную функциональность, особенно шаги, реализующие приостановку сопрограмм. Но прежде чем мы рассмотрим это, давайте сначала посмотрим, как выполняются сопрограммы.

Выполнение сопрограмм

В первой итерации фреймворка сопрограммы просто начали выполняться в каком-то объединенном фоновом потоке. Но после прочтения Примечания по структурированному параллелизму код был изменен, чтобы следовать этому шаблону и всегда выполнять сопрограммы в четко определенной среде.

CoroutineScope

Эта среда, в которой должны запускаться все сопрограммы, CoroutineScope. Основным свойством области видимости является то, что она будет блокировать вызывающий поток, если все сопрограммы, которые были запущены в ней, не завершились успешно, путем отмены или с ошибкой. И если в какой-либо сопрограмме произошла ошибка, область видимости также выдаст исключение вместо того, чтобы просто продолжить текущий поток. Это предотвращает потерю отслеживания выполнения сопрограмм и условий их ошибок.

Кроме того, область видимости дает сопрограммам доступ к данным конфигурации и предоставляет им общее место для обмена состоянием. Хотя последнее является сложной темой, поскольку может потребовать синхронизированного доступа, поскольку разные сопрограммы обычно работают в разных потоках.

Область сопрограммы не может быть создана напрямую. Вместо этого он предоставляет статические методы, которые создают новую область видимости и выполняют некоторый предоставленный приложением код, который, в свою очередь, запускает сопрограммы в этой области. Затем область видимости блокирует вызывающий поток до тех пор, пока код и все сопрограммы, которые были запущены им, не завершатся. Если во время выполнения возникает ошибка, область действия выдаст исключение.

В следующем примере показано выполнение области действия, при которой параллельно запускается миллион сопрограмм. Конечно, количество сопрограмм, которые на самом деле будут работать параллельно, во многом зависит от среды выполнения и особенно от количества доступных ядер процессора.

Если бы после вызова launch (т.е. после закрытия });) был какой-либо код, он не запустился бы до завершения всех сопрограмм, потому что, как объяснялось, область видимости будет ожидать завершения всех своих дочерних элементов.

В этом примере параллельно запускается один миллион экземпляров сопрограммы. По умолчанию они выполняются в общем пуле потоков среды выполнения Java. Если вместо этого вы попытаетесь запустить такое количество отдельных потоков, есть большая вероятность, что это вызовет ошибку нехватки памяти. С меньшим количеством потоков будут работать, но, вероятно, намного медленнее, чем решение сопрограммы (на моей машине примерно в 20 раз).

Одношаговая сопрограмма в этом примере не имеет более глубокой цели, чем занять несколько циклов обработки путем вычисления квадратных корней значений от 1 до 10 (класс Range является членом базовой библиотеки coroutines зависит от проекта). Переменная сопрограммы объявляется с универсальными типами с подстановочными знаками, потому что входные и выходные данные в этом примере не нужны. Его также можно было объявить как ‹Void, Void›, потому что это фактический тип вышеуказанного объявления.

Метод launch выполняет выполнение области действия. Аргумент - это функциональный интерфейс с методом, который ожидает в качестве аргумента область сопрограммы и ничего не возвращает. Помимо типичных функциональных интерфейсов Java, этому разрешено генерировать произвольные исключения. Это позволяет коду области действия приложения оставить обработку ошибок коду за пределами области, поскольку область будет повторно генерировать любые исключения, которые могут возникнуть.

В большинстве случаев код для области видимости, вероятно, будет представлен в виде лямбда-выражения, как в примерах. Затем сопрограммы можно запускать изнутри, вызывая их метод runAsync с областью действия в качестве аргумента и, при необходимости, входным значением (которое не используется в приведенном выше примере). Как сказано в названии метода, сопрограмма выполняется асинхронно в другом потоке, чем код области запуска.

Scope Futures

Иногда может быть желательно иметь область видимости, которая дает результат, когда все сопрограммы завершены. Этого можно достичь, запустив область действия с помощью метода productions вместо launch. В то время как последний имеет недействительный результат и блоки, generate возвращается напрямую с экземпляром интерфейса Java Future. Затем этот экземпляр можно использовать для ожидания завершения области и запроса ее результата.

Чтобы получить результат из области видимости, необходимо предоставить соответствующую функцию в качестве первого аргумента фабричного метода productions. В следующем примере эта функция называется TEXT, которая на самом деле является типом отношения из структуры ObjectRelations. Использование отношений в сопрограммах описано в конце этой статьи.

Продолжение

Когда сопрограмма запускается, возвращается экземпляр Continuation, как показано в примере кода ниже. Продолжение имеет тот же общий тип, что и тип вывода связанной сопрограммы, и служит двум целям:

  1. Обеспечьте общее состояние для всех шагов, которые выполняются в сопрограмме.
  2. Разрешить начальному коду управлять выполнением сопрограммы и получать доступ к ее результату после завершения.

Все шаги, которые выполняются в сопрограмме, получают текущее продолжение в качестве аргумента. Например, упомянутый ранее класс CodeExecution имеет альтернативные фабричные методы, которые принимают функции с дополнительным аргументом продолжения. Затем шаг может использовать продолжение для получения или установки значений состояния, запроса данных конфигурации или доступа к области, в которой он выполняется.

Продолжения всегда локальны для выполнения одной сопрограммы. Следовательно, к нему можно получить доступ по шагам без синхронизации, потому что шаги сопрограммы всегда выполняются последовательно и, следовательно, не могут получить доступ к продолжению одновременно.

Код приложения может использовать возвращенный объект продолжения для запроса состояния выполнения сопрограммы, ожидания ее завершения или отмены. Продолжение также обеспечивает доступ к результату после успешного завершения или к ошибкам, которые могли произойти. Начальный код не должен изменять какое-либо состояние продолжения, пока сопрограмма все еще работает, потому что это может вызвать проблемы одновременного изменения.

Самый простой способ получить доступ к результату сопрограммы - запросить метод getResult продолжения, как показано в примере выше. Этот метод будет блокировать вызывающий поток до тех пор, пока не будет доступен результат, или вызовет исключение, если сопрограмма завершилась неудачно или была отменена.

Чтобы дождаться завершения сопрограммы, вместо этого можно использовать метод await. Это не вернет результат и не вызовет исключение при ошибках. После этого любая обработка ошибок должна выполняться вызывающим кодом. Если такая явная обработка ошибок выполняется, приложение должно уведомить продолжение, вызвав его метод errorHandled. Это очистит ошибку в области видимости, которая в противном случае вызовет исключение при завершении.

Продолжение также можно использовать для отмены выполнения, если оно еще не завершено, вызвав метод cancel. Отмена будет автоматически проверяться между выполнениями шага, но ее также можно запросить из реализации шага, запросив isCancelled в текущем продолжении.

CoroutineContext

Как упоминалось ранее, по умолчанию сопрограммы выполняются в общем пуле потоков среды выполнения Java (ForkJoinPool.commonPool()). Но пул потоков и другие параметры выполнения фактически предоставляются CoroutineContext. Цель контекста - предоставить сопрограммам в одной или нескольких областях среду выполнения. Его также можно использовать для совместного использования конфигурации и синхронизированных ресурсов между областями.

Фреймворк имеет контекст по умолчанию, который можно запрашивать и изменять с помощью статических методов глобального класса Coroutines. Но также можно создавать и настраивать контексты только для определенных исполнений, если это необходимо. Контекст, отличный от контекста по умолчанию, может быть предоставлен при запуске новой области видимости, которая затем будет использовать контекст для всех запускаемых сопрограмм. Но для многих случаев использования контекста по умолчанию должно быть достаточно.

Подвеска и каналы

В показанных до сих пор примерах одновременно выполнялся только некоторый простой код, чего также можно было достичь с помощью обычных потоков. Но важной особенностью сопрограмм является возможность полностью приостановить их выполнение в ожидании какого-либо условия. В приостановленном состоянии сопрограмма не будет потреблять или блокировать какие-либо ресурсы обработки (например, потоки) и просто бездействовать, пока условие не будет выполнено.

Типичным случаем приостановки является связь между сопрограммами через так называемые каналы (не путать с java.nio.Channel интерфейсом). Каналы представляют собой структуры данных, подобные очереди, с фиксированной емкостью, в которую можно записывать или читать данные. Сопрограмма, которая пытается читать из пустого канала или записывать в полный канал, будет приостановлена ​​до тех пор, пока данные или емкость не станут доступными.

Фреймворк сопрограмм поддерживает приостановку шагов в целом, а также каналов в частности. Каналы обозначаются ChannelId, основная цель которого - определить тип данных канала с помощью параметра универсального типа. В следующем коде показан пример связи по каналу. Он снова использует статический импорт для всех ключевых элементов для краткой записи.

Этот код сначала определяет идентификатор канала и две сопрограммы, одна для отправки строки в канал, а другая для получения данных от него. Это достигается путем добавления шагов сопрограммы ChannelSend и ChannelReceive (через их статические фабричные методы) с идентификатором канала в качестве параметра конфигурации.

Если канал не существует при первом доступе к нему, он будет создан автоматически с емкостью 1. Если требуется более высокая емкость, канал необходимо создать заранее с помощью createChannel, либо в области действия, либо в контексте.

Затем сопрограммы выполняются в запущенной области видимости. Вызов Threads.sleep(1000) в этом примере служит только для демонстрации приостановки принимающей сопрограммы (Threads - это класс из зависимости, который также выполняет обработку исключений). При запуске сопрограммы receive канал с идентификатором testChannel будет создан автоматически с емкостью 1 и изначально будет пустым. Следовательно, сопрограмма приостанавливается до тех пор, пока данные не станут доступными в канале.

Затем код "засыпает" 1000 миллисекунд перед запуском сопрограммы send. Поскольку в канале доступна емкость, отправка будет выполнена немедленно (в противном случае отправка будет приостановлена). И теперь данные стали доступны в канале, поэтому сопрограмма получения будет возобновлена ​​и обработает эти данные. Наконец, результат обработки можно получить из продолжения.

По умолчанию каналы создаются в области сопрограммы. В качестве альтернативы каналы могут быть созданы в контексте, тем самым позволяя сопрограммам в различных областях взаимодействовать через каналы. В этом случае канал необходимо создать заранее, вызвав createChannel в контексте с желаемым идентификатором канала и емкостью.

Каналы - очень мощный механизм для связи между сопрограммами. Например, простые логические каналы с емкостью по умолчанию 1 могут использоваться как семафор для обмена сигналами между сопрограммами. И более крупные каналы со сложным типом данных могут использоваться для передачи данных между различными этапами обработки, которые выполняются одновременно. Ни в том, ни в другом случае нет необходимости выполнять сложную синхронизацию, как в традиционном параллельном коде. Отправка или получение из канала в сопрограмме достаточно для достижения синхронизации.

Выбор

Выбор - это концепция сопрограммы, которая часто связана с получением из каналов (а иногда и ограничивается). Языки с поддержкой сопрограмм обычно имеют для этой цели оператор select. Запрос приостанавливается на нескольких каналах для приема и возобновляется, как только данные становятся доступными в одном из каналов.

В этой структуре выбор реализован как шаг приостановки сопрограммы. Он расширяет концепцию, позволяя выбирать из произвольного количества сопрограмм, а не только из каналов. В приведенном ниже примере показан (надуманный) пример, в котором выбор состоит из приема одного канала и двух гипотетических сопрограмм, которые также приостанавливаются до тех пор, пока не станут доступны данные. При выполнении сопрограмма selectFirst возобновит выполнение, как только возобновится одна из сопрограмм, выбранных из. Остальные сопрограммы будут отменены.

Обычно выбор выполняется для набора приостанавливающих сопрограмм. Если бы набор содержал не приостанавливающую сопрограмму, выбор никогда не приостанавливался бы, потому что эта сопрограмма была бы выбрана немедленно. Но эту черту также можно использовать для добавления поведения по умолчанию к сопрограмме выбора: выберите значение приостановки или, если оно недоступно, используйте значение по умолчанию. В таком случае, как правило, необходимо иметь некоторую окружающую управляющую структуру, например цикл для повторения выбора до возобновления приостановки. Другой вариант - указать перед путем по умолчанию задержку приостановки, тем самым преобразовав ее в саму сопрограмму приостановки.

Коллекция

Фреймворк также расширяет концепцию выбора до коллекции. Если вы замените select на collect и или на и в приведенном выше примере, вы получите эквивалентный код для коллекция нескольких сопрограмм. Хотя выбор продолжается, если становится доступной первая приостановка, сбор будет возобновлен только после возобновления всех приостановок. Следовательно, в то время как выбор продолжается с одним значением, сбор создает набор всех значений для дальнейшей обработки на последующих этапах.

НИО

Очевидный вариант использования приостановки - ожидание ввода-вывода. Библиотека сопрограмм содержит несколько шагов приостановки, которые построены на асинхронных API-интерфейсах пакета Java New IO. Они позволяют отправлять и получать в и из сокетов, читать или записывать файлы, а также прослушивать серверные сокеты. Их использование аналогично шагам каналов. Эти шаги также могут служить примерами того, как создавать новые шаги приостановки на основе существующих асинхронных API.

Подпрограммы, условия, циклы и т. Д.

Поскольку сопрограммы необходимо объявлять через API сопрограмм, существующие конструкции Java, такие как условные выражения и выражения цикла, не могут использоваться для приостановки. Фреймворк предоставляет специальные шаги сопрограммы для этих и некоторых других общих целей. Кроме того, всегда можно легко создать подкласс CoroutineStep для создания новых шагов, если существующих реализаций недостаточно.

Подпрограммы

Любая существующая сопрограмма может быть вызвана как подпрограмма другой сопрограммы, используя шаг CallSubroutine. Это позволяет составлять новые сопрограммы из существующих и, следовательно, создавать библиотеки функций сопрограмм в качестве основы для таких композиций.

Условное исполнение

Иногда может потребоваться выполнение различных шагов (включая подпрограммы) в зависимости от результата определенного условия. Этого можно достичь с помощью шага Condition, который предоставляет несколько вариантов выполнения различных шагов в зависимости от оценки предиката:

Петли

Вместо цикла в коде, который выполняется шагом, может быть лучшим выбором повторить выполнение определенного шага или подпрограммы на основе логического условия, потому что это позволяет фреймворку приостанавливаться на каждом цикле цикла:

Итерация

Для общего случая перебора коллекции или другого объекта, реализующего интерфейс java.lang.Iterable, можно использовать шаг Iteration. Он применяет еще один шаг сопрограммы к каждому элементу коллекции и может дополнительно собирать результаты в новую коллекцию. Как и в случае с циклами, сопрограмма приостанавливается между каждым циклом итерации. В следующем примере повторяется первый пример выполнения сопрограммы и используется шаг итерации для обработки каждого элемента в диапазоне:

Отложенное выполнение

При использовании выделенных потоков приложение может ждать определенное время, вызывая Thread.sleep. Но этого метода следует избегать при работе с пулами потоков, потому что разрешение спящего режима потока из пула заблокирует его для любого другого использования. Кроме того, с сопрограммами желательно приостановить выполнение, пока сопрограмма ничего не обрабатывает.

Поэтому фреймворк предоставляет специальный шаг Delay, в котором есть несколько заводских методов, которые приостанавливают сопрограмму на определенное время. Это можно использовать как любой другой шаг, и реализация, стоящая за ним, должна быть достаточной для большинства случаев использования. В противном случае его можно заменить, установив другой планировщик для таких отложенных выполнений в контексте.

ObjectRelations

Как уже упоминалось, проект сопрограмм имеет одну зависимость - это проект ObjectRelations. Помимо некоторых базовых функций (таких как классы Range и Threads, использованные в предыдущих примерах), эта библиотека реализует новую концепцию разработки. Эта концепция позволяет общее моделирование отношений между объектами в объектно-ориентированном программировании. Вкратце, вы можете думать об отношениях как о типизированных и интеллектуальных ссылках на объекты, которые могут быть установлены для произвольных объектов. И они сами могут иметь отношения, обеспечивая простой и универсальный способ применения метаданных. Подробное введение можно найти на сайте документации.

Библиотеку сопрограмм можно использовать без дополнительных знаний о структуре ObjectRelations. Но применение отношений может сделать концепцию еще более универсальной. Например, отношения могут использоваться для произвольного общего состояния и данных конфигурации в продолжениях, областях и контекстах. Они также используются самими сопрограммами для этих и других целей, таких как регистрация слушателей событий сопрограмм или автоматическое закрытие ресурсов.

Заключение

Мы надеемся, что эта статья дает подходящее введение в фреймворк сопрограмм и его базовое использование. Напоминаем, что это первый выпуск, поэтому он может содержать некоторые ошибки и несоответствия. Любые отзывы и вопросы приветствуются, как и вклад в проект.