Анализ данных с помощью Scala и Spark: часть 6

Создание многократно используемого кода для вычисления сводной статистики

Второй раздел из серии «Наука о данных и расширенная аналитика в Spark, Scala, AWS и машинном обучении».

Предыдущий раздел



Apache Spark и Hadoop в кластере AWS с Flintrock
medium.com



Создание многократно используемого кода для вычисления сводной статистики

Хотя подход суммарной статистики работает, он довольно неэффективен; мы должны обработать все записи в проанализированном RDD девять раз, чтобы вычислить всю статистику. По мере того, как наши наборы данных становятся больше, стоимость повторной обработки всех данных снова возрастает, даже когда мы кэшируем промежуточные результаты в памяти, чтобы сэкономить время обработки. Когда мы разрабатываем распределенные алгоритмы с помощью Spark, действительно может быть полезно потратить некоторое время на выяснение того, как мы можем вычислить все ответы, которые нам могут понадобиться, за минимальное количество проходов по данным. В этом случае давайте найдем способ написать функцию, которая будет принимать любой RDD[Array[Double]], который мы ей передаем, и возвращать нам массив, который включает в себя как количество пропущенных значений для каждого индекса, так и объект StatCounter с сводная статистика непропущенных значений для каждого индекса.

Всякий раз, когда мы ожидаем, что какая-то аналитическая задача, которую нам нужно выполнить, снова будет полезна, стоит потратить некоторое время на разработку нашего кода таким образом, чтобы другим аналитикам было легко использовать решение, которое мы придумали, в своих собственных анализах. Для этого мы можем написать код Scala в отдельном файле, который затем можно загрузить в оболочку Spark для тестирования и проверки, а затем мы можем поделиться этим файлом с другими, как только узнаем, как он работает.

Наша первая задача — написать аналог класса Spark StatCounter, корректно обрабатывающий пропущенные значения. Я собираюсь создать файл с именем MissingStats.scala и скопировать в него следующие определения классов. Мы пройдемся по отдельным полям и методам, определенным здесь после кода:

В нашем классе NAStatCounter есть две переменные-члены: неизменяемый экземпляр StatCounter с именем stats и изменяемая переменная с именемmissing. Обратите внимание, что мы помечаем этот класс как Serializable, потому что мы будем использовать экземпляры этого класса внутри RDD Spark, и наша работа завершится ошибкой, если Spark не сможет сериализовать данные, содержащиеся внутри RDD.

Первый метод в классе, add, позволяет нам внести новое значение Double в статистику, отслеживаемую NAStatCounter, либо записывая его как отсутствующее, если это NaN, либо добавляя его к базовому StatCounter, если это не так. Метод слияния включает статистику, отслеживаемую другим экземпляром NAStatCounter, в текущий экземпляр. Оба эти метода возвращают это, так что их можно легко связать вместе.

Наконец, мы переопределяем метод toString в нашем классе NAStatCounter, чтобы мы могли легко распечатать его содержимое в оболочке Spark. Всякий раз, когда мы переопределяем метод из родительского класса в Scala, нам нужно добавлять перед определением метода ключевое слово override. Scala допускает гораздо более богатый набор шаблонов переопределения методов, чем Java, а ключевое слово override помогает Scala отслеживать, какое определение метода следует использовать для того или иного класса.

Наряду с определением класса мы определяем сопутствующий объект для NAStatCounter. Ключевое слово объекта Scala используется для объявления синглтона, который может предоставлять вспомогательные методы для класса, аналогичные определениям статических методов в классе Java. В этом случае метод apply, предоставляемый сопутствующим объектом, создает новый экземпляр класса NAStatCounter и добавляет заданное значение Double к экземпляру перед его возвратом. В Scala методы применения имеют особый синтаксический сахар, который позволяет нам вызывать их без необходимости вводить их явно; например, две строки делают одно и то же:

scala> :load IdeaProjects/sparkAA/missingStats.scala
Loading IdeaProjects/sparkAA/missingStats.scala...
import org.apache._
import org.apache.spark.util.StatCounter
defined class NAStatCounter
defined object NAStatCounter
warning: previously defined class NAStatCounter is not a companion to object NAStatCounter.
Companions must be defined together; you may wish to use :paste mode for this.

Примечание. Я запустил свою команду spark-shell в своем домашнем каталоге. Мой файл Scala находится в проекте на IntelliJ. Ваше местоположение может отличаться.

Мы получаем предупреждение о том, что наш объект-компаньон недействителен в режиме инкрементной компиляции, который использует оболочка, но мы можем убедиться, что несколько примеров работают так, как мы ожидаем:

scala> val nas1 = NAStatCounter(10.0)
nas1: NAStatCounter = stats: (count: 1, mean: 10.000000, stdev: 0.000000, max: 10.000000, min: 10.000000) NaN: 0
scala> nas1.add(2.1)
res18: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 0
scala> val nas2 = NAStatCounter(Double.NaN)
nas2: NAStatCounter = stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1
scala> nas1.merge(nas2)
res19: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 1

Давайте воспользуемся нашим новым классом NAStatsCounter для обработки оценок в записях MatchData в проанализированном RDD. Каждый экземпляр MatchData содержит массив оценок типа Array[Double]. Для каждой записи в массиве мы хотели бы иметь экземпляр NAStatCounter, который отслеживает, сколько значений в этом индексе являются NaN, а также регулярную статистику распределения для неотсутствующих значений. Учитывая массив значений, мы можем использовать функцию map для создания массива объектов NAStatCounter:

scala> val nasRDD = parsed.map(md => {
     | md.scores.map(d => NAStatCounter(d))})
nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MapPartitionsRDD[40] at map at <console>:34

Теперь нам нужен простой способ объединить несколько экземпляров Array[NAStatCounter] в один Array[NAStatCounter]. Мы можем объединить два массива одинаковой длины, используя zip. Это создает новый массив соответствующих пар элементов в двух массивах. За этим может следовать метод карты, который использует функцию слияния в классе NAStatCounter для объединения статистики из обоих объектов в один экземпляр:

scala> val nas1 = Array(1.0, Double.NaN).map(NAStatCounter(_))
nas1: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1)
scala> val nas2 = Array(Double.NaN, 2.0).map(NAStatCounter(_))
nas2: Array[NAStatCounter] = Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0)
scala> val merged = nas1.zip(nas2).map(p => p._1.merge(p._2))
merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)

Мы даже можем использовать синтаксис case Scala, чтобы разбить пару элементов в заархивированном массиве на переменные с красивыми именами, вместо того, чтобы использовать методы _1 и _2 в классе Tuple2:

scala> val merged = nas1.zip(nas2).map { case (a, b) => a.merge(b) }
merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)

Чтобы выполнить эту же операцию слияния для всех записей в коллекции Scala, мы можем использовать функцию сокращения, которая принимает ассоциативную функцию, которая отображает два аргумента типа T в одно возвращаемое значение типа T и применяет его снова и снова. ко всем элементам в коллекции, чтобы объединить все значения вместе. Поскольку логика слияния, которую мы написали ранее, является ассоциативной, мы можем применить ее с помощью метода reduce к набору значений Array[NAStatCounter]:

scala> val nas = List(nas1, nas2)
nas: List[Array[NAStatCounter]] = List(Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1), Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0))
scala> val merged = nas.reduce((n1, n2) => {
     | n1.zip(n2).map { case (a, b) => a.merge(b) }})
merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 3, stats: (count: 3, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)

Класс RDD также имеет действие сокращения, которое работает так же, как метод сокращения, который мы использовали для коллекций Scala, только применяется ко всем данным, распределенным по кластеру, а код, который мы используем в Spark, идентичен коду мы только что написали для List[Array[NAStatCounter]]:

scala> val reduced = nasRDD.reduce((n1, n2) => {
     | n1.zip(n2).map { case (a, b) => a.merge(b) }})
reduced: Array[NAStatCounter] = Array(
stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007, 
stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434, 
stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0, 
stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668, 
stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0, 
stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795, 
stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795, 
stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0....

Давайте инкапсулируем наш код анализа отсутствующих значений в функцию в файле missingStats.scala , которая позволяет нам вычислять эту статистику для любого RDD[Array[Double]] путем редактирования файла, чтобы включить этот блок кода:

import org.apache.spark.rdd.RDD
def missingStats(rdd: RDD[Array[Double]]): Array[NAStatCounter] = {
  val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {
    val nas: Array[NAStatCounter] = iter.next().map(NAStatCounter(_))
    iter.foreach(arr => {
      nas.zip(arr).foreach { case (n, d) => n.add(d) }
    })
    Iterator(nas)
  })
  nastats.reduce((n1, n2) => {
    n1.zip(n2).map { case (a, b) => a.merge(b) }
  })
}

Перезагрузите файл:

scala> :load IdeaProjects/sparkAA/missingStats.scala
Loading IdeaProjects/sparkAA/missingStats.scala...
import org.apache._
import org.apache.spark.util.StatCounter
import org.apache.spark.rdd.RDD
defined class NAStatCounter
warning: previously defined object NAStatCounter is not a companion to class NAStatCounter.
Companions must be defined together; you may wish to use :paste mode for this.
defined object NAStatCounter
warning: previously defined class NAStatCounter is not a companion to object NAStatCounter.
Companions must be defined together; you may wish to use :paste mode for this.
missingStats: (rdd: org.apache.spark.rdd.RDD[Array[Double]])Array[NAStatCounter]

Обратите внимание, что вместо вызова функции карты для создания массива [NAStatCounter] для каждой записи во входном СДР мы вызываем немного более продвинутую функцию mapPartition, которая позволяет нам обрабатывать все записи в разделе входного СДР[ Array[Double]] через Iterator[Array[Double]]. Это позволяет нам создать один экземпляр Array[NAStatCounter] для охвата раздела данных, а затем обновить его состояние, используя значения Array[Double], возвращаемые данным итератором, что является более эффективной реализацией. Действительно, наш методmissingStats теперь очень похож на то, как разработчики Spark реализовали метод stats для экземпляров типа RDD[Double].

Следующий



Предыдущий