Мы пытаемся использовать динамический фильтр для приложения со структурированной потоковой передачей.
Допустим, у нас есть следующая псевдо-реализация приложения структурированной потоковой передачи Spark:
spark.readStream()
.format("kafka")
.option(...)
...
.load()
.filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
.writeStream()
.format("kafka")
.option(.....)
.start();
а getFilter возвращает строку
String getFilter() {
// dynamic staff to create expression
return expression; // eg. "column = true";
}
Возможно ли в текущей версии Spark иметь условие динамического фильтра? Я имею в виду, что метод getFilter()
должен динамически возвращать условие фильтра (скажем, он обновляется каждые 10 минут). Мы попытались изучить широковещательную переменную, но не уверены, поддерживает ли структурированная потоковая передача такую вещь.
Похоже, что невозможно обновить конфигурацию задания после его отправки. В качестве развертывания мы используем yarn
.
Мы высоко ценим каждое предложение / вариант.
РЕДАКТИРОВАТЬ: предположим, что getFilter()
возвращает:
(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8
через 10 минут у нас может быть небольшое изменение (без первого выражения перед первым ИЛИ), и потенциально мы можем получить новое выражение (columnA = 2
), например:
customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2
Цель состоит в том, чтобы иметь несколько фильтров для одного искрового приложения и не отправлять несколько заданий.