агрегатная функция в Apache Spark

Мне нужно агрегировать набор данных на основе временного интервала в 1 минуту. Когда я пытаюсь это сделать, возникает ошибка:

Мой набор данных выглядит так

scala> newVX.show
+--------------------+-----+
|            datetime|value|
+--------------------+-----+
|2017-07-31 10:53:...| 0.26|
|2017-07-31 10:53:...| 0.81|
|2017-07-31 09:45:...| 0.42|
|2017-07-31 09:44:...|0.008|
|2017-07-31 09:37:...| 0.14|
|2017-07-31 09:35:...|0.365|
|2017-07-31 09:34:...|0.485|
|2017-07-31 09:33:...| 0.49|
|2017-07-31 09:28:...| 1.15|
|2017-07-31 09:27:...|0.325|
|2017-07-31 09:24:...|0.845|
|2017-07-31 09:24:...|0.045|
|2017-07-31 09:23:...|0.015|
|2017-07-31 09:20:...| 0.45|
|2017-07-31 09:20:...| 0.05|
|2017-07-31 09:19:...| 0.14|
|2017-07-31 09:18:...| 0.24|
|2017-07-31 09:12:...|0.125|
|2017-07-31 09:11:...|  0.3|
|2017-07-31 09:11:...| 0.13|
+--------------------+-----+


scala> newVX.groupBy("datetime","60 seconds").agg(avg("value")).show

org.apache.spark.sql.AnalysisException: невозможно разрешить имя столбца «60 секунд» среди (datetime, value); в org.apache.spark.sql.Dataset $$ anonfun $ resolve $ 1.apply (Dataset.scala: 216) в org.apache.spark.sql.Dataset $$ anonfun $ resolve $ 1.apply (Dataset.scala: 216) в scala.Option.getOrElse (Option.scala: 121) в org.apache.spark.sql.Dataset.resolve (Dataset.scala: 215) в org.apache.spark.sql.Dataset $$ anonfun $ groupBy $ 2.apply (Dataset.scala: 1442) в org.apache.spark.sql.Dataset $$ anonfun $ groupBy $ 2.apply (Dataset.scala: 1442) в scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala : 234) в scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234) в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) в scala.collection.mutable. ArrayBuffer.foreach (ArrayBuffer.scala: 48) в scala.collection.TraversableLike $ class.map (TraversableLike.scala: 234) в scala.collection.AbstractTraversable.map (Traversable.scala: 104) в org.apache. spark.sql.Dataset.groupBy (Dataset.scala: 1442) ... 58 исключено

Я тоже пробовал другое решение. но вместо агрегирования он дает значения для каждых 60 строк.

scala> newVX.groupBy(window($"datetime","1 minute")).agg(avg("value") as "avg-va
lue").show()


17/07/31 12:41:02 WARN Executor: Managed memory leak detected; size = 4456448 by
tes, TID = 5
+--------------------+-------------------+
|              window|          avg-value|
+--------------------+-------------------+
|[2017-07-31 07:49...| 0.7699999809265137|
|[2017-07-31 05:34...|0.33500000834465027|
|[2017-07-31 04:26...|0.23999999463558197|
|[2017-07-30 20:04...| 0.9399999976158142|
|[2017-07-29 08:33...|0.20250000059604645|
|[2017-07-28 09:30...| 0.3400000035762787|
|[2017-07-27 16:36...| 1.2799999713897705|
|[2017-07-27 08:16...| 0.3400000035762787|
|[2017-07-27 08:11...| 0.3400000035762787|
|[2017-07-27 01:06...| 0.4650000035762787|
|[2017-07-26 23:53...|0.23999999463558197|
|[2017-07-26 19:49...| 0.3199999928474426|
|[2017-07-25 14:39...| 0.3400000035762787|
|[2017-07-25 07:54...| 0.7099999785423279|
|[2017-07-25 06:21...|0.29499998688697815|
|[2017-07-25 03:57...| 0.1899999976158142|
|[2017-07-24 20:31...| 1.2799999713897705|
|[2017-07-24 19:50...| 1.2799999713897705|
|[2017-07-24 16:26...|0.03999999910593033|
|[2017-07-24 16:10...|              0.125|
+--------------------+-------------------+
only showing top 20 rows

Изменить: я внес здесь несколько исправлений, и он все еще показывает неправильный результат. Я сохранил дату до минутного значения, указав

val VX = newvx.withColumn("datetime", ((unix_timestamp($"datetime") / 60)
.cast("long") * 60).cast("timestamp"))

после агрегирования он по-прежнему показывает неправильное значение.

scala> VX.groupBy("datetime").agg(Map("value" -> "mean")).show
17/07/31 15:58:15 WARN Executor: Managed memory leak detected; size = 4456448 by
tes, TID = 21
+-------------------+-------------------+
|           datetime|         avg(value)|
+-------------------+-------------------+
|2017-07-31 06:38:00| 0.6100000143051147|
|2017-07-30 19:46:00| 0.3400000035762787|
|2017-07-30 09:24:00|0.42500001192092896|
|2017-07-29 08:53:00| 0.8899999856948853|
|2017-07-29 15:07:00| 0.3400000035762787|
|2017-07-29 05:26:00| 0.3100000023841858|
|2017-07-28 23:29:00|0.27250000834465027|
|2017-07-28 22:07:00| 0.3199999928474426|
|2017-07-28 20:48:00| 0.2849999964237213|
|2017-07-28 20:13:00|0.44999998807907104|
|2017-07-28 18:07:00|0.20999999344348907|
|2017-07-28 06:38:00|0.08500000089406967|
|2017-07-27 11:27:00|0.26499998569488525|
|2017-07-27 02:37:00| 1.0549999475479126|
|2017-07-27 02:12:00| 0.3449999988079071|
|2017-07-26 22:22:00| 0.4699999988079071|
|2017-07-25 15:22:00| 0.8199999928474426|
|2017-07-25 07:08:00| 0.2800000011920929|
|2017-07-25 06:42:00|0.32499998807907104|
|2017-07-25 04:42:00|0.30000001192092896|
+-------------------+-------------------+
only showing top 20 rows

Есть идеи, почему? и как мне это исправить? Спасибо.


person dhinar    schedule 31.07.2017    source источник
comment
Как значения не верны?   -  person Rick Moritz    schedule 31.07.2017
comment
Я дал время как 1 мин. Но это не дает значение 1 мин. Как видно из приведенного выше, он дает случайные значения из таблицы. Он должен начинаться с 31.07.2017 10:53: .. но вывод начинается с 31.07.2017 06:38:00   -  person dhinar    schedule 31.07.2017
comment
Вы пробовали сортировать перед использованием шоу?   -  person Rick Moritz    schedule 31.07.2017
comment
Это уже отсортировано. Я вижу, что агрегация происходит часами, а не каждую минуту   -  person dhinar    schedule 31.07.2017
comment
Попробуйте сократить свои данные до минимально воспроизводимого примера - может быть, всего три даты и значения, чтобы было более очевидно, что происходит.   -  person Rick Moritz    schedule 31.07.2017


Ответы (1)


Вы можете использовать следующий подход:

создайте пользовательскую функцию в искре, которая сохранит только дату до минутного уровня зерна. Например, 31.07.2017 10:53

def atMinute = udf((dateTime: String) => // implement here retain date till minute)

преобразовать исходный фрейм данных с помощью udf

val df_at_minute = df.withColumn("datetime_at_min", atMinute("datetime"))

Примените агрегатную функцию к новому фрейму данных

 df_at_minute.groupBy("datetime_at_min").agg(avg("value"))
person dumitru    schedule 31.07.2017
comment
Привет, я сохранил дату до минуты, сделав вот так: val VX = newvx.withColumn("datetime", ((unix_timestamp($"datetime") / 60) .cast("long") * 60).cast("timestamp")), и все работало отлично. Но когда я groupyBy и использую avg, он все равно дает случайный результат. - person dhinar; 31.07.2017
comment
Я думаю, что это проблема, больше связанная с тем, как вы вычисляете поле datetime, потому что это агрегация; все в порядке - person dumitru; 31.07.2017