искровый структурированный потоковый динамический строковый фильтр

Мы пытаемся использовать динамический фильтр для приложения со структурированной потоковой передачей.

Допустим, у нас есть следующая псевдо-реализация приложения структурированной потоковой передачи Spark:

spark.readStream()
     .format("kafka")
     .option(...)
     ...
     .load()
     .filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
     .writeStream()
     .format("kafka")
     .option(.....)
     .start();

а getFilter возвращает строку

String getFilter() {
   // dynamic staff to create expression
   return expression; // eg. "column = true";
}

Возможно ли в текущей версии Spark иметь условие динамического фильтра? Я имею в виду, что метод getFilter() должен динамически возвращать условие фильтра (скажем, он обновляется каждые 10 минут). Мы попытались изучить широковещательную переменную, но не уверены, поддерживает ли структурированная потоковая передача такую ​​вещь.

Похоже, что невозможно обновить конфигурацию задания после его отправки. В качестве развертывания мы используем yarn.

Мы высоко ценим каждое предложение / вариант.


РЕДАКТИРОВАТЬ: предположим, что getFilter() возвращает:

(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8

через 10 минут у нас может быть небольшое изменение (без первого выражения перед первым ИЛИ), и потенциально мы можем получить новое выражение (columnA = 2), например:

customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2

Цель состоит в том, чтобы иметь несколько фильтров для одного искрового приложения и не отправлять несколько заданий.


person VladoDemcak    schedule 18.06.2018    source источник


Ответы (2)


Переменная трансляции здесь должна быть в порядке. Вы можете написать типизированный фильтр, например:

query.filter(x => x > bv.value).writeStream(...)

где bv - переменная Broadcast. Вы можете обновить его, как описано здесь: Как я могу обновить широковещательная переменная в потоковой передаче искр?

Другое решение - предоставить, например, конечную точку RCP или RESTful и запрашивать эту конечную точку каждые 10 минут. Например (Java, потому что здесь проще):

class EndpointProxy {

     Configuration lastValue;
     long lastUpdated
     public static Configuration getConfiguration (){

          if (lastUpdated + refreshRate > System.currentTimeMillis()){
               lastUpdated = System.currentTimeMillis();
               lastValue = askMyAPI();
          }
          return lastValue;
     }
}


query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()

Изменить: хакерский обходной путь для проблемы пользователя:

Вы можете создать однострочное представление: // confsDF должен быть в каком-то одноэлементном элементе на стороне драйвера var confsDF = Seq (некоторый контент) .toDF ("someColumn")

and then use:
query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value 
      .filter("hiveUDF(conf.someColumn)")
      .writeStream()...

 new Thread() {
     confsDF = Seq(some new data).toDF("someColumn)
 }.start();

Этот хакер использует модель выполнения Spark по умолчанию - микропакеты. В каждом триггере запрос перестраивается, поэтому следует учитывать новые данные.

Вы также можете сделать в ветке:

Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")

а затем в запросе:

.crossJoin(spark.table("conf"))

Оба должны работать. Имейте в виду, что он не будет работать в режиме непрерывной обработки.

person T. Gawęda    schedule 18.06.2018
comment
Спасибо за ваш ответ! Также мы используем фильтр def filter(conditionExpr: String) - github.com/apache/spark/blob/master/sql/core/src/main/scala/org/ со строкой и условиями. И мы хотели бы использовать эту функцию и избегать функционального способа, как в ваших примерах. Я думал о трансляции bv.value, которая возвращает String и периодически обновляется. Если бы было невозможно использовать широковещательную рассылку, мы бы воспользовались вашим вторым советом, но опять же, строка не работает. Вы не возражаете, что струнно можно делать? Я думал, что работа неизменна после отправки. - person VladoDemcak; 18.06.2018
comment
@VladoDemcak Spark создает литерал из вашего ввода, поэтому здесь нельзя будет использовать String. Литералы неизменны - person T. Gawęda; 18.06.2018
comment
ох .. спасибо в любом случае информация о литералах действительно важна. но это печально, потому что мы используем UDF улья (как строки). Знаете ли вы другой обходной путь или что-то, что можно было бы использовать и достичь тех же результатов, что и у нас сейчас (использовать filter(String condition)), а также иметь динамический фильтр? Я проверил where(), но, наверное, тоже самое. - person VladoDemcak; 18.06.2018
comment
@VladoDemcak Where создает такое же выражение внутри. Как вы используете этот udf? Может быть, это будет возможно, что я внес в правку - person T. Gawęda; 18.06.2018
comment
Спасибо за правку, проверю и попробую в ближайшем будущем. Пожалуйста, проверьте мои изменения. Я добавил пример того, как мы используем и подготавливаем пользовательское выражение фильтра (строку). Я также добавил подробное объяснение нашего варианта использования, которого мы хотим достичь. - person VladoDemcak; 18.06.2018
comment
Итак, на основе документации ›// Выберите устройства, у которых сигнал превышает 10› df.select("device").where("signal > 10") // используя нетипизированный API ds.filter(_.signal > 10).map(_.device) // используя типизированные API, мы можем заменить 10, например, широковещательной переменной, но мы не можем для нетипизированного API? - person VladoDemcak; 18.06.2018
comment
@VladoDemcak Это правильно - Spark преобразует 10 в буквальное значение, что является неизменным. Лямбда - это черный ящик, который нельзя передать в исходный код, но он также дает возможность иметь более динамичный стиль фильтра. - person T. Gawęda; 18.06.2018

Вот простой пример, в котором я динамически фильтрую записи, поступающие из сокета. Вместо Date вы можете использовать любой API для отдыха, который может динамически обновлять ваш фильтр, или облегченный экземпляр zookeeper.

Примечание: - Если вы планируете использовать какой-либо rest API, zookeeper или любой другой вариант, используйте mapPartition вместо filter, потому что в этом случае у вас есть вызов API / Connection один раз для раздела.

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].filter(_ == new java.util.Date().getMinutes.toString)

// Generate running word count
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
person Kaushal    schedule 18.06.2018
comment
Ничего страшного, это аналогичное предложение, которое я написал. Но OP говорит, что он хочет, чтобы фитлер был строкой, а не лямбдой. - person T. Gawęda; 18.06.2018
comment
точно так же, как @T. - сказал Гавенда. поэтому я не могу проголосовать за ответ, даже если он полезен. мы хотим использовать не лямбда, а строку. По сути, нам нужно избегать отправки нескольких заданий только за счет небольших изменений в фильтре. - person VladoDemcak; 18.06.2018
comment
это правильно, но вы можете сгенерировать любую строку, которую хотите. это дает вам все возможности. вы можете написать lamda, которая вызывает api и дает вам подходящую строку. - person Kaushal; 18.06.2018
comment
конечно, но это не простое сравнение строк в OP. поскольку выражение where у нас не является лямбда-выражением. когда мы вызываем api и получаем строку, эта строка будет в основном там, где выражение, которое sqlparser искры преобразует в странный персонал, который бесполезен b / c, как сказал @T.Gawęda в своем ответе - это неизменяемо. Я думал о функции преобразования или что-то в этом роде, но мне нужно немного поиграть - person VladoDemcak; 18.06.2018