Луч Apache WithTimestamps: временные метки вывода должны быть не раньше, чем временные метки текущего ввода.

Я пытаюсь выполнить окно данных из потока Google Cloud pubsub с частотой 10 с, однако получаю эту ошибку:

java.lang.IllegalArgumentException: невозможно выводить с отметкой времени 2019-07-20T12: 13: 04.875Z. Отметки времени вывода должны быть не раньше отметки времени текущего ввода (2019-07-20T12: 13: 05.591Z) за вычетом допустимого перекоса (0 миллисекунд). Подробную информацию об изменении допустимого перекоса см. В документации Javadoc DoFn # getAllowedTimestampSkew (). org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner $ DoFnProcessContext.checkTimestamp (SimpleDoFnRunner.java:587) org.apache.beam.runners.dataflow.worker.repackaged. org.apache.beam.runners.core.SimpleDoFnRunner $ DoFnProcessContext.outputWithTimestamp (SimpleDoFnRunner.java:566) или sdk.transforms.WithTimestamps $ AddTimestampsDoFn.processElement (WithTimestamps.java:136)

Вот код, вызывающий ошибку:

eventStream
  .apply("Add Event Timestamps",
    WithTimestamps.of((Event event) -> new Instant(event.getTime())))
  .apply("Window Events",
    Window.<Event>into(FixedWindows.of(Duration.parseDuration("10s"))));

В чем причина этого и каково подходящее решение?


person Kyle Durnam    schedule 20.07.2019    source источник


Ответы (1)


Из документации:

Если входные элементы {@link PCollection} имеют метки времени, выходная метка времени для каждого элемента не должна быть раньше метки времени входного элемента за вычетом значения {@link getAllowedTimestampSkew ()}. Если выходная метка времени раньше этого времени, преобразование при выполнении вызовет исключение {@link IllegalArgumentException}. Используйте {@link withAllowedTimestampSkew (Duration)}, чтобы обновить допустимый перекос.

ВНИМАНИЕ! Использование {@link #withAllowedTimestampSkew (Duration)} позволяет размещать элементы за водяным знаком. Эти элементы считаются запоздавшими, и, если они находятся за {@link Window # withAllowedLateness (Duration) allowed lateness} нисходящего потока, {@link PCollection} могут быть отброшены без уведомления.

Итак, чтобы исправить проблему, вы можете поиграть с withAllowedTimestampSkew.

Я использовал другой API: withTimestampAttribute. Вы можете установить атрибут в вашем JSON / AVRO, который будет содержать поле отметки времени.

Этот API доступен при публикации:

  .apply(PubsubIO.writeAvros(Someclass.class)
         .withIdAttribute("id")
         .withTimestampAttribute("myTime").to(topic));

А при подписке:

.apply(PubsubIO.readAvros(Someclass.class) .fromSubscription(...)
       .withIdAttribute("id").withTimestampAttribute("myTime"))
person Brachi    schedule 21.07.2019