Условное развертывание в Spark Structured Streaming/Spark SQL

Я пытаюсь выполнить условный взрыв в Spark Structured Streaming.

Например, мой потоковый фрейм данных выглядит следующим образом (здесь полностью собраны данные). Я хочу разбить массив сотрудников на отдельные строки массивов, когда contingent = 1. Когда contingent = 0, мне нужно оставить массив как есть.

|----------------|---------------------|------------------|
|     Dept ID    |     Employees       |    Contingent    |
|----------------|---------------------|------------------|
|          1     | ["John", "Jane"]    |       1          |
|----------------|---------------------|------------------|
|          4     | ["Amy", "James"]    |       0          |
|----------------|---------------------|------------------|
|          2     | ["David"]           |       1          |
|----------------|---------------------|------------------|

Итак, мой вывод должен выглядеть так (мне не нужно отображать столбец contingent:

|----------------|---------------------|
|     Dept ID    |     Employees       |
|----------------|---------------------|
|          1     | ["John"]            |
|----------------|---------------------|
|          1     | ["Jane"]            |
|----------------|---------------------|
|          4     | ["Amy", "James"]    |
|----------------|---------------------|
|          2     | ["David"]           |
|----------------|---------------------|

Есть пара проблем, с которыми я сейчас сталкиваюсь:

  1. Взрывание массивов условно
  2. взорвать массивы в массивы (в данном случае не в строки)

В Hive была концепция UDTF (пользовательские табличные функции), которая позволяла мне это делать. Интересно, есть ли что-нибудь сравнимое с ним?


person DataGeek    schedule 13.06.2018    source источник


Ответы (1)


Используйте flatMap, чтобы взорвать и указать любое условие, которое вы хотите.

case class Department (Dept_ID: String, Employees: Array[String], Contingent: Int)
case class DepartmentExp (Dept_ID: String, Employees: Array[String])

val ds = df.as[Department]

ds.flatMap(dept => {
  if (dept.Contingent == 1) {
    dept.Employees.map(emp => DepartmentExp(dept.Dept_ID, Array(emp)))
  } else {
    Array(DepartmentExp(dept.Dept_ID, dept.Employees))
  }
}).as[DepartmentExp]
person Kaushal    schedule 13.06.2018