Apache Beam: триггер для фиксированного окна

Согласно после в документации указано, что если вы явно не укажете триггер, вы получите поведение, описанное ниже:

Если не указано иное, по умолчанию запускается сначала, когда водяной знак проходит через конец окна, а затем запускается снова каждый раз, когда появляются опоздавшие данные.

Верно ли это поведение и для FixedWindow? Например, вы можете предположить, что фиксированное окно должно иметь триггер по умолчанию для многократного срабатывания после того, как водяной знак проходит конец окна, и отбрасывать все поздние данные, если поздние данные не обрабатываются явным образом. Также где в исходном коде я могу увидеть определение триггера, например, для объекта FixedWindow?


person user_1357    schedule 11.01.2019    source источник


Ответы (1)


Лучше всего начать с руководства по триггерам и windows (и переход по ссылкам оттуда). В частности, в нем говорится, что, хотя триггер по умолчанию срабатывает каждый раз, когда поступают поздние данные, в конфигурации по умолчанию он по-прежнему эффективно срабатывает только один раз, отбрасывая поздние данные:

если вы используете как конфигурацию окон по умолчанию, так и триггер по умолчанию, триггер по умолчанию срабатывает ровно один раз, а поздние данные отбрасываются. Это связано с тем, что конфигурация окон по умолчанию имеет допустимое значение задержки 0. См. Раздел Обработка поздних данных для получения информации об изменении этого поведения.

Подробности

Концепция окон в Beam в целом включает в себя несколько вещей, в том числе назначение окон, обработку триггеров, обработку запаздывающих данных и многое другое. Однако эти вещи назначаются и обрабатываются отдельно. Отсюда это быстро сбивает с толку.

Как элементы назначаются окну, обрабатывается WindowFn, см. здесь. Например, FixedWindows: ссылка. Это практически единственное, что там происходит (почти). Назначение окна - это особый случай группировки элементов на основе временных меток событий (вроде). Вы можете думать о логике, подобной назначению вручную настраиваемых ключей элементам на основе временных меток с последующим применением GroupByKey.

Запуск - это связанное, но отдельное понятие. Триггеры (грубо говоря) просто предикаты, указывающие, когда бегуну разрешено выдавать данные, накопленные на данный момент в окне (source). Я думаю, что это больше всего похоже на исходную проектную документацию для триггеров: https://s.apache.org/beam-triggers

Задержка - это еще одна связанная часть конфигурации, которая также является несколько отдельной (ссылка). Несмотря на то, что триггер может позволить бегуну выдавать все запаздывающие данные навсегда, конвейер может быть настроен так, чтобы не разрешать любые запаздывающие данные (что является поведением по умолчанию) или разрешать только запоздалые данные в течение некоторого ограниченного времени. Это приводит к описанному выше поведению триггера по умолчанию. Да, это сбивает с толку. По возможности избегайте использования сложных срабатываний и опозданий, скорее всего, это не сработает так, как вы ожидаете.

Таким образом, классы окон обрабатывают только логику группировки, то есть какие элементы имеют одинаковый ключ группировки. Эти классы не заботятся о том, когда вы захотите выдать накопленные результаты. Это зависит от вашей бизнес-логики, например. вы можете захотеть обработать вновь поступившие элементы или отказаться от них, это не часть окна. Это означает, что нет специальных триггеров для FixedWindows или других окон, вы можете использовать любой триггер с любым окном (даже если логически какой-то конкретный триггер не имеет смысла в контексте какого-либо окна).

Триггер по умолчанию - это просто то, что установлено по умолчанию. Вы должны назначить свой собственный триггер, если он вам не подходит. И, скорее всего, не будет, за исключением некоторых основных случаев использования.

Обновить

Пример использования FixedWindows с триггерами.

person Anton    schedule 11.01.2019
comment
Поэтому, если вы вообще не укажете триггер, а только WindowFn, будет использоваться триггер по умолчанию. И триггер по умолчанию заявляет, что он будет срабатывать для всех поздних данных. Я смущен. Каково будет поведение, если вы вообще не установите триггер? И где это задокументировано? - person user_1357; 11.01.2019
comment
Это сбивает с толку, да. Если вы не укажете триггер, будет использован триггер по умолчанию, да. И это действительно срабатывает для всех поздних данных, да. Но по умолчанию допустимая задержка равна нулю, поэтому в действительности не будет данных, поступающих с опозданием, поэтому он сработает только один раз. Если вы не укажете допустимую задержку больше нуля. - person Anton; 12.01.2019
comment
Тот факт, что задержка по умолчанию, равная нулю, приводит к срабатыванию по умолчанию только один раз, упоминается здесь: beam. apache.org/documentation/programming-guide/#triggers - person Anton; 12.01.2019
comment
Подробнее о задержках здесь: beam.apache.org/documentation / руководство по программированию / - person Anton; 12.01.2019
comment
И пример того, как это установить, находится здесь: github.com/apache/beam/blob/ - person Anton; 12.01.2019
comment
Обычно, когда вы говорите Window.into(), в PCollection обновляются только те вещи, которые вы явно указываете. Если вы не укажете триггер, будет использован ранее указанный триггер (или триггер по умолчанию, если до этого ничего не было явно указано). Если вы не укажете задержку, используется ранее указанное значение (или ноль, если раньше ничего не было задано явно). - person Anton; 12.01.2019
comment
Руководство по программированию Apache Beam очень хорошо написано, пока вы не дойдете до окон / триггеров, и тогда становится трудно следовать. Что произойдет, если у вас есть триггер для срабатывания после 18 секунд обработки и фиксированные окна продолжительностью 60 секунд? Панель отправляется в следующую коллекцию PCollection 3 раза за окно? 4 раза (например, 3 раза для триггера и один раз для конца окна)? Что-то другое? Как узнать, повторяется ли 18-секундный триггер или срабатывает только один раз? - person Stephen; 19.11.2019
comment
Еще одна вещь, которая немного сбивает с толку: разделяя на окна, вы уже достигаете цели обработки меньших объемов данных, а не ждете, пока все данные будут доступны (что, конечно, может быть невозможно). Поэтому мне непонятно, зачем вам нужны триггеры, чтобы получить его еще быстрее. Если вы хотите получать данные еще быстрее, вы можете просто уменьшить размер окон. - person Stephen; 19.11.2019
comment
Окна основаны на времени событий в Beam. Когда вы определяете окно, это означает что-то вроде того, что я хочу сгруппировать все события, которые произошли между 13:00 и 14:00 по всемирному координированному времени 2019. Таким образом, все события будут принадлежать этому окну независимо от того, когда они фактически достигли конвейера (например, сервера потока данных). Представьте, что у вас есть миллион мобильных телефонов с медленным подключением, а иногда и без него. На этих мобильных телефонах вы входите в систему, когда пользователи нажимают кнопку. И когда вы регистрируете это на телефоне, вы используете местную временную метку телефона. - person Anton; 21.11.2019
comment
Теперь, если телефон немедленно отправляет событие в конвейер, это примерно такая же метка времени. Но что, если телефон отключится на несколько часов? Затем, когда вы попытаетесь отправить событие в следующий раз, когда телефон подключится к сети, событие будет давным через несколько часов. Иногда можно сделать это таким образом и игнорировать временную метку, когда событие на самом деле произошло. Но если вы хотите учитывать, когда пользователь действительно нажал кнопку, а не когда событие достигло сервера, вам нужна концепция временного пространства события. - person Anton; 21.11.2019
comment
С точки зрения конвейера это означает, что он должен дождаться, пока не появятся все события для определенного окна, прежде чем фактически выполнять логику, например вычисление агг-функций. Если вы работаете во время события, это означает, что он также должен дождаться всех поздних событий. То есть, если событие с одного мобильного телефона задерживается на несколько часов, тогда конвейер должен дождаться его прибытия и поместить его в правильное окно, когда событие действительно произошло, а не когда оно было получено конвейером через несколько часов. . - person Anton; 21.11.2019
comment
Но на практике есть способ узнать, когда будут получены все запоздалые данные. Пользователь может выключить мобильный телефон на год, а затем через год отправить, когда он будет подключен к сети, он отправит события годичной давности. В модели Beam эти события будут учтены в старом окне, которому тоже год. Луч увидит событие и скажет - эй, это принадлежит тому окну из года, позвольте мне соединить его со всеми элементами, которые мы уже получили за год. - person Anton; 21.11.2019
comment
Таким образом, технически Beam не может обрабатывать какие-либо данные, пока не узнает, что он получил все последние элементы для этого окна. Это означает, что в большинстве случаев он должен ждать вечно, поскольку вы не знаете, когда может прибыть последний поздний элемент. Что непрактично. Так что у Beam есть триггеры, чтобы не ждать вечно. С триггерами вы говорите - давайте действительно обработаем это окно, когда условие триггера будет выполнено, а не ждать вечно. - person Anton; 21.11.2019
comment
И при применении триггера вы также говорите, что делать с поздними элементами, например вы можете сказать - давайте продолжим ждать опоздавших элементов в течение часа и повторно запустим всю логику для этого окна, если появится новый элемент, и если элемент, принадлежащий этому окну, появится более чем на час позже, тогда давайте отбросим его, потому что мы не хотим хранить старые данные навсегда и постоянно повторно обрабатывать старые окна - person Anton; 21.11.2019