Amazon Kinesis — определение времени ожидания шага

Попытка создать запрос Kinesis Analytics, чтобы предупредить, когда шаг в процессе занял слишком много времени (или умер и не продвинулся дальше).

У меня есть поток данных, который содержит обновления статуса, поскольку многоэтапный процесс движется от шага к шагу. Я пытаюсь написать запрос, который может определить, когда следующий шаг не произошел в течение определенного промежутка времени (также известного как тайм-аут). В частности, я хотел бы знать, когда один идентификатор процесса не переходит из «Начало» в «Работает» в течение 5 минут.

Я знаю, как это сделать в базе данных, но это сбивает с толку, когда шкала времени постоянно движется. Любая помощь, которую вы можете оказать, очень ценится!

Мои события имеют три атрибута:
ProcessID — целое число
Status — строка («Начато», «Выполняется» или «Завершено»)
HappenedOn — Дата и время (например, 2017-10 -02 15:17:00)

Как бы я сделал это в базе данных (не в Kinesis)

В SQL я бы использовал соединение таблицы событий с самой собой, используя LEFT OUTER JOIN, но не могу понять, как это сделать в ситуации запроса в реальном времени.

#This will show me the start events that don't have a corresponding 'running' event

SELECT * FROM events as F 
LEFT OUTER JOIN events as S on F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE  F.STATUS = 'start' AND S.STATUS IS NULL;

Решение в Kinesis на данный момент
Этот запрос сохраняется и выполняется, но не дает мне того, что я ищу.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (E1PROCESSID integer, 
E1STATUS varchar(7), E1HAPPENED varchar(32), E2PROCESSID integer, 
E2STATUS varchar(7), E2HAPPENED varchar(32) );

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"

SELECT F.PROCESSID, F.STATUS, F.HAPPENED, S.PROCESSID, S.STATUS, S.HAPPENED
FROM "SOURCE_SQL_STREAM_001" OVER (RANGE INTERVAL '5' MINUTE PRECEDING) AS F 
LEFT OUTER JOIN "SOURCE_SQL_STREAM_001"  AS S
ON F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE F.STATUS = 'start' AND S.STATUS IS NULL;

Даже если бы я мог заставить приведенный выше запрос работать, мне нужно, чтобы Kinesis искал соответствующие события (или их отсутствие) только через 5 минут после значения HAPPENED (например, нужно выполнить DATEDIFF между текущей датой и временем и HAPPENED). Любые советы о том, как добавить это, будут оценены.

Кроме того, я чувствую, что мне нужно использовать FOLLOWING, а не PRECEDING, но синтаксический анализатор SQL не позволит мне (и я понимаю, почему). Я также запутался в том, к какому потоку присоединиться, чтобы добавить окно OVER к... LEFT? ПРАВИЛЬНО? ОБА?

Спасибо заранее.


person Tim Merkel    schedule 02.10.2017    source источник
comment
Для справки, в документации Amazon говорится об использовании ВНЕШНЕГО СОЕДИНЕНИЯ в этом article, но каждый раз, когда я пытаюсь использовать FOLLOWING вместо PRECEDING, SQL-валидатор злится на меня.   -  person Tim Merkel    schedule 03.10.2017


Ответы (1)


Вы можете сделать это с помощью Drools, создав следующие правила:

declare EventA
  @role( event )
end

declare EventB
  @role( event ) 
end

rule "Timeout EventA"
when
  $a : EventA()
  not(exists(EventB(this after[0,5m] $a)))
then
  insertLogical(new TimeoutA($a.id));
end

Вы можете создать Drools Kinesis Analytics с помощью этой службы.

person Konstantin Triger    schedule 05.10.2017
comment
Ух ты! Это круто. Я знал, что Drools может это сделать, но не знал, что на рынке есть вариант AWS Kinesis. Не могу не отблагодарить за помощь! - person Tim Merkel; 07.10.2017
comment
@TimMerkel: если вы найдете это полезным, отметьте его как ответ - person Konstantin Triger; 09.10.2017