Анализ данных с помощью Scala и Spark: часть 6
Создание многократно используемого кода для вычисления сводной статистики
Второй раздел из серии «Наука о данных и расширенная аналитика в Spark, Scala, AWS и машинном обучении».
Предыдущий раздел
Создание многократно используемого кода для вычисления сводной статистики
Хотя подход суммарной статистики работает, он довольно неэффективен; мы должны обработать все записи в проанализированном RDD девять раз, чтобы вычислить всю статистику. По мере того, как наши наборы данных становятся больше, стоимость повторной обработки всех данных снова возрастает, даже когда мы кэшируем промежуточные результаты в памяти, чтобы сэкономить время обработки. Когда мы разрабатываем распределенные алгоритмы с помощью Spark, действительно может быть полезно потратить некоторое время на выяснение того, как мы можем вычислить все ответы, которые нам могут понадобиться, за минимальное количество проходов по данным. В этом случае давайте найдем способ написать функцию, которая будет принимать любой RDD[Array[Double]], который мы ей передаем, и возвращать нам массив, который включает в себя как количество пропущенных значений для каждого индекса, так и объект StatCounter с сводная статистика непропущенных значений для каждого индекса.
Всякий раз, когда мы ожидаем, что какая-то аналитическая задача, которую нам нужно выполнить, снова будет полезна, стоит потратить некоторое время на разработку нашего кода таким образом, чтобы другим аналитикам было легко использовать решение, которое мы придумали, в своих собственных анализах. Для этого мы можем написать код Scala в отдельном файле, который затем можно загрузить в оболочку Spark для тестирования и проверки, а затем мы можем поделиться этим файлом с другими, как только узнаем, как он работает.
Наша первая задача — написать аналог класса Spark StatCounter, корректно обрабатывающий пропущенные значения. Я собираюсь создать файл с именем MissingStats.scala и скопировать в него следующие определения классов. Мы пройдемся по отдельным полям и методам, определенным здесь после кода:
В нашем классе NAStatCounter есть две переменные-члены: неизменяемый экземпляр StatCounter с именем stats и изменяемая переменная с именемmissing. Обратите внимание, что мы помечаем этот класс как Serializable, потому что мы будем использовать экземпляры этого класса внутри RDD Spark, и наша работа завершится ошибкой, если Spark не сможет сериализовать данные, содержащиеся внутри RDD.
Первый метод в классе, add, позволяет нам внести новое значение Double в статистику, отслеживаемую NAStatCounter, либо записывая его как отсутствующее, если это NaN, либо добавляя его к базовому StatCounter, если это не так. Метод слияния включает статистику, отслеживаемую другим экземпляром NAStatCounter, в текущий экземпляр. Оба эти метода возвращают это, так что их можно легко связать вместе.
Наконец, мы переопределяем метод toString в нашем классе NAStatCounter, чтобы мы могли легко распечатать его содержимое в оболочке Spark. Всякий раз, когда мы переопределяем метод из родительского класса в Scala, нам нужно добавлять перед определением метода ключевое слово override. Scala допускает гораздо более богатый набор шаблонов переопределения методов, чем Java, а ключевое слово override помогает Scala отслеживать, какое определение метода следует использовать для того или иного класса.
Наряду с определением класса мы определяем сопутствующий объект для NAStatCounter. Ключевое слово объекта Scala используется для объявления синглтона, который может предоставлять вспомогательные методы для класса, аналогичные определениям статических методов в классе Java. В этом случае метод apply, предоставляемый сопутствующим объектом, создает новый экземпляр класса NAStatCounter и добавляет заданное значение Double к экземпляру перед его возвратом. В Scala методы применения имеют особый синтаксический сахар, который позволяет нам вызывать их без необходимости вводить их явно; например, две строки делают одно и то же:
scala> :load IdeaProjects/sparkAA/missingStats.scala Loading IdeaProjects/sparkAA/missingStats.scala... import org.apache._ import org.apache.spark.util.StatCounter defined class NAStatCounter defined object NAStatCounter warning: previously defined class NAStatCounter is not a companion to object NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this.
Примечание. Я запустил свою команду spark-shell в своем домашнем каталоге. Мой файл Scala находится в проекте на IntelliJ. Ваше местоположение может отличаться.
Мы получаем предупреждение о том, что наш объект-компаньон недействителен в режиме инкрементной компиляции, который использует оболочка, но мы можем убедиться, что несколько примеров работают так, как мы ожидаем:
scala> val nas1 = NAStatCounter(10.0) nas1: NAStatCounter = stats: (count: 1, mean: 10.000000, stdev: 0.000000, max: 10.000000, min: 10.000000) NaN: 0 scala> nas1.add(2.1) res18: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 0 scala> val nas2 = NAStatCounter(Double.NaN) nas2: NAStatCounter = stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1 scala> nas1.merge(nas2) res19: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 1
Давайте воспользуемся нашим новым классом NAStatsCounter для обработки оценок в записях MatchData в проанализированном RDD. Каждый экземпляр MatchData содержит массив оценок типа Array[Double]. Для каждой записи в массиве мы хотели бы иметь экземпляр NAStatCounter, который отслеживает, сколько значений в этом индексе являются NaN, а также регулярную статистику распределения для неотсутствующих значений. Учитывая массив значений, мы можем использовать функцию map для создания массива объектов NAStatCounter:
scala> val nasRDD = parsed.map(md => { | md.scores.map(d => NAStatCounter(d))}) nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MapPartitionsRDD[40] at map at <console>:34
Теперь нам нужен простой способ объединить несколько экземпляров Array[NAStatCounter] в один Array[NAStatCounter]. Мы можем объединить два массива одинаковой длины, используя zip. Это создает новый массив соответствующих пар элементов в двух массивах. За этим может следовать метод карты, который использует функцию слияния в классе NAStatCounter для объединения статистики из обоих объектов в один экземпляр:
scala> val nas1 = Array(1.0, Double.NaN).map(NAStatCounter(_)) nas1: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1) scala> val nas2 = Array(Double.NaN, 2.0).map(NAStatCounter(_)) nas2: Array[NAStatCounter] = Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0) scala> val merged = nas1.zip(nas2).map(p => p._1.merge(p._2)) merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)
Мы даже можем использовать синтаксис case Scala, чтобы разбить пару элементов в заархивированном массиве на переменные с красивыми именами, вместо того, чтобы использовать методы _1 и _2 в классе Tuple2:
scala> val merged = nas1.zip(nas2).map { case (a, b) => a.merge(b) } merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)
Чтобы выполнить эту же операцию слияния для всех записей в коллекции Scala, мы можем использовать функцию сокращения, которая принимает ассоциативную функцию, которая отображает два аргумента типа T в одно возвращаемое значение типа T и применяет его снова и снова. ко всем элементам в коллекции, чтобы объединить все значения вместе. Поскольку логика слияния, которую мы написали ранее, является ассоциативной, мы можем применить ее с помощью метода reduce к набору значений Array[NAStatCounter]:
scala> val nas = List(nas1, nas2) nas: List[Array[NAStatCounter]] = List(Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1), Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0)) scala> val merged = nas.reduce((n1, n2) => { | n1.zip(n2).map { case (a, b) => a.merge(b) }}) merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 3, stats: (count: 3, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)
Класс RDD также имеет действие сокращения, которое работает так же, как метод сокращения, который мы использовали для коллекций Scala, только применяется ко всем данным, распределенным по кластеру, а код, который мы используем в Spark, идентичен коду мы только что написали для List[Array[NAStatCounter]]:
scala> val reduced = nasRDD.reduce((n1, n2) => { | n1.zip(n2).map { case (a, b) => a.merge(b) }}) reduced: Array[NAStatCounter] = Array( stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007, stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434, stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668, stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0....
Давайте инкапсулируем наш код анализа отсутствующих значений в функцию в файле missingStats.scala , которая позволяет нам вычислять эту статистику для любого RDD[Array[Double]] путем редактирования файла, чтобы включить этот блок кода:
import org.apache.spark.rdd.RDD def missingStats(rdd: RDD[Array[Double]]): Array[NAStatCounter] = { val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => { val nas: Array[NAStatCounter] = iter.next().map(NAStatCounter(_)) iter.foreach(arr => { nas.zip(arr).foreach { case (n, d) => n.add(d) } }) Iterator(nas) }) nastats.reduce((n1, n2) => { n1.zip(n2).map { case (a, b) => a.merge(b) } }) }
Перезагрузите файл:
scala> :load IdeaProjects/sparkAA/missingStats.scala Loading IdeaProjects/sparkAA/missingStats.scala... import org.apache._ import org.apache.spark.util.StatCounter import org.apache.spark.rdd.RDD defined class NAStatCounter warning: previously defined object NAStatCounter is not a companion to class NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. defined object NAStatCounter warning: previously defined class NAStatCounter is not a companion to object NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. missingStats: (rdd: org.apache.spark.rdd.RDD[Array[Double]])Array[NAStatCounter]
Обратите внимание, что вместо вызова функции карты для создания массива [NAStatCounter] для каждой записи во входном СДР мы вызываем немного более продвинутую функцию mapPartition, которая позволяет нам обрабатывать все записи в разделе входного СДР[ Array[Double]] через Iterator[Array[Double]]. Это позволяет нам создать один экземпляр Array[NAStatCounter] для охвата раздела данных, а затем обновить его состояние, используя значения Array[Double], возвращаемые данным итератором, что является более эффективной реализацией. Действительно, наш методmissingStats теперь очень похож на то, как разработчики Spark реализовали метод stats для экземпляров типа RDD[Double].