Есть ли элегантный способ обработки потока кусками?

Мой точный сценарий - вставка данных в базу данных партиями, поэтому я хочу накапливать объекты DOM, а затем каждые 1000, очищать их.

Я реализовал это, поместив в аккумулятор код для определения заполнения, а затем сброса, но это кажется неправильным - управление сбросом должно исходить от вызывающей стороны.

Я мог бы преобразовать поток в список, а затем использовать подсписок итеративно, но это тоже кажется неуклюжим.

Есть ли удобный способ выполнять действия через каждые n элементов, а затем продолжать поток, обрабатывая поток только один раз?


person Bohemian♦    schedule 20.12.2014    source источник
comment
Для аналогичного варианта использования я сделал это: at=master" rel="nofollow noreferrer">bitbucket.org/assylias/bigblue-utils/src/ — это не совсем то, о чем вы просите.   -  person assylias    schedule 21.12.2014


Ответы (8)


Элегантность в глазах смотрящего. Если вы не возражаете против использования функции с отслеживанием состояния в groupingBy, вы можете сделать это:

AtomicInteger counter = new AtomicInteger();

stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
    .values()
    .forEach(database::flushChunk);

Это не выиграет никаких очков производительности или использования памяти по сравнению с вашим исходным решением, потому что оно все равно материализует весь поток, прежде чем что-либо делать.

Если вы хотите избежать материализации списка, потоковый API вам не поможет. Вам нужно будет получить итератор или разделитель потока и сделать что-то вроде этого:

Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;

while(true) {
    List<Integer> chunk = new ArrayList<>(size);
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
    if (chunk.isEmpty()) break;
    database.flushChunk(chunk);
}
person Misha    schedule 22.12.2014
comment
Хорошее решение для разветвителя! Вариант с collect/groupingBy завершает поток, и это не лучший вариант для больших потоков. - person Krzysztof Tomaszewski; 26.09.2019

Если у вас есть зависимость от гуавы в вашем проекте, вы можете сделать это:

StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);

См. https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-

person user2814648    schedule 16.05.2018
comment
Это решение разбивает список, а не поток. Полезно, но не то, что спросил @Bohemian. - person AlikElzin-kilaka; 28.11.2018
comment
@AlikElzin-kilaka, но вы можете создать поток с помощью iterable (baeldung.com/java-iterable -в поток). - person mgcation; 22.04.2021

Вы можете создать поток фрагментов (List<T>) из потока элементов и заданного размера фрагмента,

  • группировка элементов по индексу чанка (индекс элемента/размер чанка)
  • упорядочение фрагментов по их индексу
  • сокращение карты только до их упорядоченных элементов

Код:

public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
    AtomicInteger index = new AtomicInteger(0);

    return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
            .entrySet().stream()
            .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}

Пример использования:

Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));

Выход:

Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]
person Peter Walser    schedule 17.05.2018
comment
Спасибо, воспользовался вашим решением. Я удалил сортировку, которая не нужна в моем случае. - person user1708042; 19.11.2020
comment
Очень хорошее решение - person Kingshuk Mukherjee; 18.02.2021
comment
Это решение будет считывать весь поток в карту перед обработкой фрагментов, а не создавать фрагменты в середине потока. Это может быть не то, что вы хотели бы/ожидали, особенно для больших потоков, которые, вероятно, являются самым большим вариантом использования для обработки фрагментов. - person Markus Rohlof; 23.05.2021
comment
@MarkusRohlof да, вы абсолютно правы. Я только что попытался придумать решение для больших (и потенциально бесконечных) потоков, но обнаружил, что оно выглядит точно так же, как предложенное dmitryvim, поэтому я действительно могу порекомендовать его решение. - person Peter Walser; 24.05.2021

Большинство приведенных выше ответов не используют преимущества потока, такие как экономия памяти. Вы можете попробовать использовать итератор для решения проблемы

Stream<List<T>> chunk(Stream<T> stream, int size) {
  Iterator<T> iterator = stream.iterator();
  Iterator<List<T>> listIterator = new Iterator<>() {

    public boolean hasNext() {
      return iterator.hasNext();
    }

    public List<T> next() {
      List<T> result = new ArrayList<>(size);
      for (int i = 0; i < size && iterator.hasNext(); i++) {
        result.add(iterator.next());
      }
      return result;
    }
  };
  return StreamSupport.stream(((Iterable<List<T>>) () -> listIterator).spliterator(), false);
}
person dmitryvim    schedule 03.12.2019
comment
Очень хорошее решение, +1. Только одно улучшение: вы можете захотеть вернуть поток как return StreamSupport.stream(Spliterators.spliteratorUnknownSize(listIterator, Spliterator.ORDERED), false);. - person Peter Walser; 24.05.2021

Решение с использованием библиотеки StreamEx будет выглядеть так:

Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15);
AtomicInteger counter = new AtomicInteger(0);
int chunkSize = 4;

StreamEx.of(stream)
        .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0)
        .forEach(chunk -> System.out.println(chunk));

Выход:

[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14]

groupRuns принимает предикат, который решает, должны ли 2 элемента быть в одной группе.

Он создает группу, как только находит первый элемент, который ей не принадлежит.

person Nazarii Bardiuk    schedule 25.07.2016
comment
Это не работает для одной записи. Например, целочисленный поток просто [1] не удастся. - person wild_nothing; 04.08.2017
comment
Поток одного элемента работает для меня. Какую ошибку вы видите? Не могли бы вы опубликовать код, который вы пробовали? - person Nazarii Bardiuk; 04.08.2017
comment
Счетчик возвращает неверное значение в случае одной записи. - person wild_nothing; 06.08.2017
comment
Я считаю, что в случае одной записи groupRuns() никогда не вызывается, поскольку она ожидает две записи. Есть ли решение, если поток возвращает только один результат? IncrementAndGet на счетчике в вашем примере никогда не срабатывает и возвращает 0, если размер фрагмента равен 1. - person wild_nothing; 12.08.2017
comment
Группировка с размером фрагмента 1 создает поток списков размера 1. Я считаю, что это ожидаемое поведение. Можете ли вы объяснить, чего вы пытаетесь достичь и какая у вас проблема? Вероятно, с новым вопросом о переполнении стека - трудно делиться кодом в комментариях - person Nazarii Bardiuk; 12.08.2017
comment
Поднято: stackoverflow.com/questions/45649990/ - person wild_nothing; 12.08.2017
comment
Это хорошо, но только если исходный поток является последовательным, что, я думаю, необходимо для исходного OP. К сожалению, в моем случае у меня есть источник параллельного потока и ничего не работает, % chunkSize != 0 создает куски chunkSize среднего размера total/chunkSize. - person zakmck; 23.11.2019

Похоже, нет, потому что создание чанков означает сокращение потока, а сокращение означает завершение. Если вам нужно поддерживать природу потока и обрабатывать фрагменты без сбора всех данных, вот мой код (не работает для параллельных потоков):

private static <T> BinaryOperator<List<T>> processChunks(Consumer<List<T>> consumer, int chunkSize) {
    return (data, element) -> {
        if (data.size() < chunkSize) {
            data.addAll(element);
            return data;
        } else {
            consumer.accept(data);
            return element; // in fact it's new data list
        }
    };
}

private static <T> Function<T, List<T>> createList(int chunkSize) {
    AtomicInteger limiter = new AtomicInteger(0);
    return element -> {
        limiter.incrementAndGet();
        if (limiter.get() == 1) {
            ArrayList<T> list = new ArrayList<>(chunkSize);
            list.add(element);
            return list;
        } else if (limiter.get() == chunkSize) {
            limiter.set(0);
        }
        return Collections.singletonList(element);
    };
}

и как использовать

Consumer<List<Integer>> chunkProcessor = (list) -> list.forEach(System.out::println);

    int chunkSize = 3;

    Stream.generate(StrTokenizer::getInt).limit(13)
            .map(createList(chunkSize))
            .reduce(processChunks(chunkProcessor, chunkSize))
            .ifPresent(chunkProcessor);

static Integer i = 0;

static Integer getInt()
{
    System.out.println("next");
    return i++;
}

он будет печатать

следующий следующий следующий следующий 0 1 2 следующий следующий следующий 3 4 5 следующий следующий следующий 6 7 8 следующий следующий следующий 9 10 11 12

идея состоит в том, чтобы создавать списки в операции карты с «шаблоном»

[1,,],[2],[3],[4,,]...

и объединить (+ обработать) это с уменьшением.

[1,2,3],[4,5,6],...

и не забудьте обработать последний «обрезанный» фрагмент с помощью

.ifPresent(chunkProcessor);
person Yura    schedule 28.08.2019

Как правильно сказал Миша, Элегантность в глазах смотрящего. Я лично думаю, что элегантным решением было бы позволить классу, который вставляется в базу данных, выполнять эту задачу. Похоже на BufferedWriter. Таким образом, он не зависит от вашей исходной структуры данных и может использоваться даже с несколькими потоками после одного и другого. Я не уверен, что это именно то, что вы имеете в виду, имея в аккумуляторе код, который, по вашему мнению, неверен. Я не думаю, что это неправильно, так как существующие классы, такие как BufferedWriter, работают таким образом. Таким образом, у вас есть некоторый контроль над очисткой от вызывающего объекта, вызывая flush() на модуле записи в любой момент.

Что-то вроде следующего кода.

class BufferedDatabaseWriter implements Flushable {
    List<DomObject> buffer = new LinkedList<DomObject>();
    public void write(DomObject o) {
        buffer.add(o);
        if(buffer.length > 1000)
            flush();
    }
    public void flush() {
        //write buffer to database and clear it
    }
}

Теперь ваш поток обрабатывается следующим образом:

BufferedDatabaseWriter writer = new BufferedDatabaseWriter();
stream.forEach(o -> writer.write(o));
//if you have more streams stream2.forEach(o -> writer.write(o));
writer.flush();

Если вы хотите работать в многопоточном режиме, вы можете запустить асинхронный сброс. Взятие из потока не может идти параллельно, но я все равно не думаю, что есть способ параллельно подсчитать 1000 элементов из потока.

Вы также можете расширить средство записи, чтобы разрешить установку размера буфера в конструкторе, или вы можете реализовать AutoCloseable и запустить его в попытке с ресурсами и т. д. Хорошие вещи, которые у вас есть от BufferedWriter.

person findusl    schedule 17.05.2018
comment
Вы также можете сделать его AutoCloseable, а затем попробовать (BufferedDatabaseWriter bdw = new BufferedDatabaseWriter()) { stream.forEach(o -> Writer.write(o)); } - person Iouri Goussev; 19.03.2021

Вы можете использовать этот класс, https://github.com/1wpro2/jdk-patch/blob/main/FixedSizeSpliterator.java.

Передайте размер фрагмента как THRESHOLD

new FixedSizeSpliterator(T[] values, int threshold)

person engineer    schedule 17.03.2021