Динамическое определение размера окон балок apache

Я читаю события из PubSub, и цель состоит в том, чтобы сгруппировать их в окна. Я хотел бы, чтобы конец каждого окна совпадал с минутами 0, 15, 30 и 45 каждого часа.
Поскольку это потоковая работа, ее можно запустить в любое время, и я хотел бы найти способ выровнять размер первого окна со следующими.
Это будет поток:

  1. Запустить работу
  2. Определите как window_size время, оставшееся между этим моментом и следующей четвертью часа.
  3. Начиная с конца этого первого окна, установите window_size = int(15*60) (секунды).

Например:

  1. Запустить работу
  2. Сейчас 11:18, так что исправь window_size = (11:30-11:18).seconds
  3. Когда это первое окно закончится, установите window_size = int(15*60) (секунды)

В одном из примеров, предоставленных Google, конвейер, работающий с оконным управлением, определяется следующим образом, где window_size - это параметр, переданный в качестве ввода пользователем:

def expand(self, pcoll):
  return (
          pcoll
          | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
          | "Add Key" >> beam.Map(lambda elem: (None, elem))
          | "Groupby" >> beam.GroupByKey()
          | "Abandon Key" >> beam.MapTuple(lambda _, val: val)
  )

person Federico Barusco    schedule 11.08.2020    source источник
comment
Согласно документации, оконное управление делит коллекцию PCollection в соответствии с отметками времени ее отдельные элементы. Сказав это, я хотел бы сначала спросить, как выполняется ваша работа?   -  person Alexandre Moraes    schedule 11.08.2020
comment
Как я уже упоминал в вопросе, это задание потоковой передачи, которое получает события от PubSub, группирует их в окна, а затем выполняет другие операции на следующих этапах конвейера. Я хотел бы понять, есть ли возможность изменить размер окна во время выполнения, назначив один window_size первому окну, а другой - второму окну.   -  person Federico Barusco    schedule 11.08.2020


Ответы (1)


Ваш вариант использования идеально подходит для Beam!

Во-первых, необходимо прояснить базовую концептуальную проблему:

  • Отметки времени на элементах, используемые для работы с окнами, называются временем события. Они являются частью данных и описывают, когда произошло какое-то событие в вашем потоке.
  • Время запуска и выполнения задания называется временем обработки. Это не часть ваших данных.

Вы добьетесь большего успеха, если не объедините и не перепутаете эти два. Windows не запускается и не завершается во время обработки вашего задания. Окна существуют на все времена.

Использование FixedWindows в течение 15 минут сделает именно то, что вы хотите. Каждое событие будет связано с 15-минутным интервалом, в который оно попадает. Когда вы запускаете свою работу или когда событие поступает на обработку, это не влияет на это.

ОБНОВЛЕНИЕ: добавление примера для иллюстрации:

Предположим, вы запускаете свою работу в 11:18, как указано в вашем вопросе, и предполагаете, что входящие события генерируются примерно в одно и то же время. Предположим, что наступают следующие события с указанными отметками времени:

  • A @ 11:01
  • B @ 11:18
  • C @ 11:15
  • D @ 11:31
  • E @ 11:29

Элементы будут присвоены окнам следующим образом:

  • A in [11:00, 11:15)
  • B in [11:15, 11:30)
  • C in [11:15, 11:30)
  • D in [11:30, 11:45)
  • E in [11:15, 11:30)

Обратите внимание, что назначение окна не связано с тем, когда вы начали работу, или когда наступает событие, или с порядком прибытия. Вы можете запустить его завтра или повторно запустить для архивированных данных или данных, которые даже не близки к порядку, и результат будет таким же. Окно времени событий основано на данных.

person Kenn Knowles    schedule 11.08.2020
comment
Если вы используете FixedWindows, все окна будут иметь одинаковую продолжительность, независимо от того, когда они запускаются. Например, с 15-минутным FixedWindow, если я запустил задание в 10:53, первое окно будет 10: 53-11: 08, второе 11: 08-11: 23, третье 11: 23-11: 38 и так далее. Вопрос в том, как мне это сделать, если я хочу, чтобы каждое окно точно совпадало с четвертью часа (например, первое окно 10: 53-11: 00, второе окно 11: 00-11: 15, третье окно 11: 15- 11:30 и т. Д.)? - person Federico Barusco; 11.08.2020
comment
Это не так. Границы FixedWindows полностью не зависят от того, когда выполняется ваша работа. - person Kenn Knowles; 11.08.2020
comment
Я обновил ответ иллюстрацией. Это помогает? - person Kenn Knowles; 11.08.2020