Публикации по теме 'reactive-streams'


Реактивные потоки в JavaScript с RSocket Flowable
Реактивные потоки в JavaScript с RSocket Flowable Потоковая передача сообщений, обратное давление, отмена и асинхронное программирование на JavaScript с rsocket-flowable. При программировании асинхронных операций разработчики JavaScript обычно по крайней мере знакомы с обещаниями, async / await и обратными вызовами. Однако новые асинхронные модели, такие как ReactiveX ( RxJS в JavaScript), не так популярны. В этой статье мы углубляемся в нечто похожее, но немного отличающееся от..

Вопросы по теме 'reactive-streams'

Как получить подписчика и издателя из транслируемого потока Akka?
У меня возникают проблемы с удалением издателей и подписчиков из моих потоков при использовании более сложных графиков. Моя цель - предоставить API издателей и подписчиков и запустить потоковую передачу Akka внутри. Вот моя первая попытка, которая...
1388 просмотров
schedule 07.03.2023

Источник на основе издателя не выводит элементы
Я сделал источник для потока Akka на основе ReactiveStreams Publisher следующим образом: object FlickrSource { val apiKey = Play.current.configuration.getString("flickr.apikey") val flickrUserId =...
90 просмотров
schedule 08.10.2022

В akka-stream как создать неупорядоченный источник из коллекции фьючерсов
Мне нужно создать akka.stream.scaladsl.Source[T, Unit] из коллекции Future[T] . Например, имея коллекцию фьючерсов, возвращающих целые числа, val f1: Future[Int] = ??? val f2: Future[Int] = ??? val fN: Future[Int] = ??? val futures =...
3306 просмотров

Scala Akka Stream: как пройти через последовательность
Я пытаюсь обернуть некоторые блокирующие вызовы в Future . Тип возвращаемого значения — Seq[User] , где User — это case class . Следующее просто не будет компилироваться с жалобами на наличие различных перегруженных версий. Какие-либо...
5303 просмотров
schedule 14.08.2022

Выполнение по запросу горячего Observable
Приведем холодный пример: Observable<Integer> cold = Observable.create(subscriber -> { try { for (int i = 0; i <= 42; i++) { // avoid doing unnecessary work if (!subscriber.isUnsubscribed()) { break; }...
197 просмотров

После выпуска спецификации Reactive Streams 1.0 станет ли реактивной спецификация jdbc?
Я изучал и использовал программирование реактивных потоков с потоками akka, я пытался найти любые библиотеки для async-jdbc-driver или reactive-jdbc-driver в течение 2 лет, и я обнаружил, что slick 3.0 или rxjava-jdbc-driver предоставляют async jdbc...
1640 просмотров

Как скопировать некоторые записи из таблицы A в таблицу B с помощью slick-streaming и akka-streaming
Есть две таблицы TableA и TableB . Мне нужно скопировать некоторые записи из TableA в TableB . Я использую slick-3.0 и использую следующим образом: import akka.stream._ import akka.stream.scaladsl._ ... //{{ READ DATA FROM TABLE A val...
96 просмотров
schedule 27.07.2022

Может ли подписчик выступать в роли издателя?
С точки зрения реактивных потоков, есть издатель, и у него может быть столько же подписчиков. Но предположим, что подписчик получает сообщение от Publisher. Теперь этот подписчик (скажем, Subs1) изменяет/модифицирует сообщение и передает его...
387 просмотров

Моно против потока в реактивном потоке
Согласно документации: Flux - это поток, который может испускать 0..N элементов: Flux<String> fl = Flux.just("a", "b", "c"); Mono - это поток из 0..1 элементов: Mono<String> mn = Mono.just("hello"); И так...
30757 просмотров

Как реализовать Hot Stream в реактивном программировании
Согласно парадигме реактивного потока, В настоящее время мы сосредоточились в основном на холодных потоках. Это статические потоки фиксированной длины, с которыми легко работать. Более реалистичным вариантом использования реактивного может...
987 просмотров

В чем концептуальная разница между приемником и подписчиком в проектных потоках «реактор» или «акка»?
Понятия приемник и подписчик мне кажутся похожими. Кроме того, я не вижу, чтобы концепция приемника была явно определена в спецификации реактивных потоков.
3755 просмотров

Зачем нужен тип Mono в Reactor 3?
Reactor 3 имеет 2 основных типа данных, оба из которых являются издателями реактивных потоков. reactor.core.publisher.Mono<T> reactor.core.publisher.Flux<T> Я понимаю разницу между Mono как потоком из 0 или 1 элементов и...
283 просмотров

Как я могу узнать, завершился ли Observable с ошибкой или без ошибки?
Мне нужно выполнить некоторый код, когда Observable будет завершен, в зависимости от того, был ли он завершен с ошибкой или без. У меня есть этот код: const obs = getMyObservable().pipe(finalize(() => { //here })); Как видите, я...
3519 просмотров
schedule 11.01.2024

Поток Spring Reactive не работает с обратным прокси-сервером Netflix Zuul
Мы создали 7 микросервисов для нашего проекта, и все они доступны через zuul proxy. Один из микросервисов (Live-dashboard) содержит поток реактивной загрузки Spring для обновления панели управления в реальном времени с использованием (события,...
465 просмотров

Project Reactor: doOnNext (или другие doOnXXX) асинхронно
Есть ли какой-нибудь метод, подобный doOnNext, но асинхронный? Например, мне нужно сделать длинную регистрацию (уведомление отправлено по электронной почте) для определенного элемента. Scheduler myParallel = Schedulers.newParallel("my-parallel",...
3333 просмотров

Как с помощью Akka Streams узнать, что источник завершен?
У меня есть Alpakka Elasticsearch Sink , который я храню между запросами. Когда я получаю запрос, я создаю Source из HTTP-запроса и превращаю его в Source Elasticsearch WriteMessage , а затем запускаю его с...
564 просмотров

Нехватка памяти при загрузке большого количества записей из базы данных
Я использую slick в Akka Streams для загрузки большого количества записей (~ 2M) из базы данных (postgresql) и записи их в файл S3. Тем не менее, я заметил, что мой код ниже работает для записей около ~ 50 тыс., Но не работает для чего-либо выше...
209 просмотров

Project Reactor: мне нужен процессор?
Я пытаюсь создать структуру конвейера поверх Reactor. На каждом этапе (не считая первого и последнего) у нас есть задачи, которые преобразуют объект (то есть строку по ее длине или URL-адрес ее HTML-содержимого и т. Д.). Вот пример: Вы...
767 просмотров

Как написать клиент RSocket на JavaScript
Я пытаюсь реализовать сервер RSocket на Java и клиент на JavaScript, но не могу вызвать ни один из методов в своем бэкэнде. Java-сервер public final class RawServer { public static void main(String[] args) {...
3116 просмотров

Spring Reactive Stream - неожиданное отключение
Мы используем Spring Cloud Reactive Streams с RabbitMQ. Spring Reactive Stream, похоже, подтверждает сообщение, как только удаляет его из очереди. Таким образом, любые ошибки, необработанные исключения, которые происходят во время обработки...
155 просмотров