Заказ Flink Stream SQL по

У меня есть потоковый ввод, например, данные о ценах акций (включая несколько акций), и я хочу выполнять ранжирование по их цене каждые 1 минуту. Рейтинг основан на последней цене всех акций и должен отсортировать их все, независимо от того, обновлялась ли она за предыдущую 1 минуту или нет. Я пытался использовать ORDER BY в потоке flink SQL.

Мне не удалось реализовать свою логику, и меня смущают две части:

  1. Почему ORDER BY может использовать только атрибут времени в качестве основного и поддерживать только ASC? Как я могу реализовать заказ по другому типу, например по цене?

  2. Что означает приведенный ниже SQL (из документа Flink)? Нет ни окна, ни окна, поэтому я предполагаю, что SQL будет выполняться немедленно для каждого поступившего заказа, в этом случае сортировка одного элемента выглядит бессмысленной.

[Обновление]: когда я читаю код ProcimeSortProcessFunction.scala, мне кажется, что Flink сортирует элементы, полученные в течение следующей миллисекунды.

SELECT *
FROM Orders
ORDER BY orderTime

Наконец, есть ли способ реализовать мою логику на SQL?


person yinhua    schedule 09.03.2018    source источник


Ответы (1)


ORDER BY в потоковых запросах трудно вычислить, потому что мы не хотим обновлять весь результат, когда нам нужно выдать результат, который должен перейти в начало таблицы результатов. Поэтому мы поддерживаем ORDER BY time-attribute только в том случае, если можем гарантировать, что результаты имеют (примерно) увеличивающиеся временные метки.

В будущем (Flink 1.6 или новее) мы также будем поддерживать некоторые запросы, такие как ORDER BY x ASC LIMIT 10, что приведет к обновлению таблицы, содержащей записи с 10 наименьшими значениями x.

В любом случае, вы не можете (легко) вычислить рейтинг топ-k в минуту, используя GROUP BY вращающееся окно. GROUP BY запросы объединяют записи группы (также окна в случае GROUP BY TUMBLE(rtime, INTERVAL '1' MINUTE)) в одну запись. Таким образом, в минуту будет не несколько записей, а только одна.

Если вы хотите, чтобы запрос вычислял топ-10 по полю a в минуту, вам понадобится запрос, подобный этому:

SELECT a, b, c 
FROM (
  SELECT 
    a, b, c, 
    RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank 
  FROM yourTable)
WHERE rank <= 10

Однако такие запросы еще не поддерживаются Flink (версия 1.4), потому что атрибут времени используется в предложении PARTITION BY, а не в предложении ORDER BY окна OVER.

person Fabian Hueske    schedule 20.03.2018
comment
Спасибо, Фабиан, я ошибался, думая, что ORDER BY рассчитывается для элементов внутри окна, а не для результата окна, спасибо за пояснение. - person yinhua; 22.03.2018