Spark: разные выходные данные с разным количеством ядер

Я имею дело со странным поведением, когда я изменяю количество ядер в своем приложении Spark, и вот код:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Test extends App {
Logger.getLogger("org").setLevel(Level.WARN)
var listLink: List[String] = List()
def addListLink(s: String) = {
val list = s.split(",")
for (i <- 0 to list.length - 2) {
  listLink = list(i)+ "-" + list(i + 1) :: listLink
 }
}
val conf = new SparkConf().setMaster("local[1]").setAppName("Simple Application")
val sc = new SparkContext(conf)
val paths = sc.textFile("file:///tmp/percorsi.txt")
paths.foreach(x => addListLink(x))
println("Number of items:"+listLink.size)
println(listLink)
}

Мой входной файл выглядит примерно так:

A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C
A,B,C
A,B,C
A,B,C
A,B,C
B,C,D
B,C,D
B,C,D
B,C,D
C,D
C,D

В основном для каждого пути я вызываю свой метод, который добавляет элемент в список, представляющий каждую последовательную пару элементов:

пример: "A,B,C,D" => ("A-B", "B-C", "C-D")

Как видите, в коде всего одно ядро

.setMaster("local[1]")

И если я запускаю свое приложение (локально или в кластере), я получаю то, что ожидаю.

println("Number of items:"+listLink.size)
//Result --> Number of Items : 38

Если я изменю количество ядер на 3 (например), я получу другие значения. Например 33 предмета вместо 38.

Я что-то упустил из-за количества ядер или чего-то еще (разделов и т. д.)?

Я думаю, что это довольно простое приложение, но я все равно получаю это странное поведение.

Кто-нибудь может мне помочь?

заранее спасибо

FF


person Fabio Fantoni    schedule 21.05.2015    source источник
comment
почему вы используете foreach на своем RDD ??? Что ты пытаешься сделать?   -  person eliasah    schedule 21.05.2015
comment
для каждого элемента в моем RDD я хочу вызвать этот метод, чтобы увеличить свой список   -  person Fabio Fantoni    schedule 21.05.2015
comment
может быть, вы слышали о сокращении карты? нет? foreachВыполняет функцию без параметров для каждого элемента данных.   -  person eliasah    schedule 21.05.2015
comment
да, действительно... Как говорится, может быть, я что-то упускаю, но могу ли я попросить вас более конкретную подсказку? Я имею в виду, это проблема только для стиля foreach?   -  person Fabio Fantoni    schedule 21.05.2015
comment
хорошо, это во-первых, но все же не о стиле, во-вторых, не могли бы вы объяснить, пожалуйста, что вы пытаетесь сделать с функцией списка ссылок?   -  person eliasah    schedule 21.05.2015
comment
как сказано в вопросе, для каждого элемента путей RDD (например, A, B, C, D) мне нужно увеличить свой список ссылок с парами последовательных букв в нем (например, AB, B-C, C-D)... в в конце приложения listLink.size должно быть 38, но при изменении количества ядер с 1 на N (с N > 1) я получаю другое значение   -  person Fabio Fantoni    schedule 21.05.2015
comment
Давайте продолжим обсуждение в чате.   -  person eliasah    schedule 21.05.2015


Ответы (1)


Для каждого раздела есть отдельный listLink. Итак, вы добавляете элементы в несколько списков, и в конце печатается только один.

Обычно, когда функция, переданная в операцию Spark (например, map или reduce), выполняется на удаленном узле кластера, она работает с отдельными копиями всех переменных, используемых в функции. Эти переменные копируются на каждую машину, и никакие обновления переменных на удаленной машине не распространяются обратно в программу-драйвер.

(отсюда https://spark.apache.org/docs/latest/programming-guide.html#shared-variables)

Это твой счастливый день:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer


val data = List(
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C",
"A,B,C",
"A,B,C",
"A,B,C",
"A,B,C",
"B,C,D",
"B,C,D",
"B,C,D",
"B,C,D",
"C,D",
"C,D")

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc= new SparkContext(conf)


val dataRDD = sc.makeRDD(data, 1)
val linkRDD = dataRDD.flatMap(_.split(",").sliding(2).map{_.mkString("", "-", "")})

linkRDD.foreach(println)

Выход:

A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
A-B
B-C
A-B
B-C
A-B
B-C
A-B
B-C
B-C
C-D
B-C
C-D
B-C
C-D
B-C
C-D
C-D
C-D
person The Archetypal Paul    schedule 21.05.2015
comment
Большое спасибо, Пол, это больше, чем я предполагал... Я совершенно забыл общие переменные... Я посмотрю и посмотрю, как изменить мой код. - person Fabio Fantoni; 21.05.2015
comment
Я хотел бы, чтобы моя проблема была решена (включая код), прежде чем принять ваш ответ, чтобы помочь кому-то еще с такими же сомнениями - person Fabio Fantoni; 21.05.2015
comment
ХОРОШО. В общем, SO не является сайтом, пожалуйста, напишите мой код. Кроме того, я полагаю, создание полного списка ссылок не является конечной целью. Пожалуйста, опишите, что вы хотите рассчитать, потому что, вероятно, есть более похожий на Spark способ сделать это. Как и просил @eliasah. - person The Archetypal Paul; 21.05.2015
comment
Еще раз спасибо, Пол ... Как сказано в моем первом комментарии к вашему ответу, я бы попытался улучшить свой код как можно скорее (поскольку я не сижу перед своим ПК). Я знаю, что SO не пишет мой код. В любом случае, поскольку вы написали это, я еще раз благодарю вас. - person Fabio Fantoni; 21.05.2015
comment
И, кстати, ваше искровое решение определенно чище :-) - person Fabio Fantoni; 21.05.2015
comment
Отличный ответ Павел! Вот почему я пытался понять, чего он пытается достичь. Я знал, что это проблема с общей переменной - person eliasah; 22.05.2015
comment
@Paul: поскольку я изучаю широковещательную переменную, чтобы лучше понять их, могу ли я попросить вас несколько советов по моему вопросу ... Несмотря на то, что я использую широковещательную переменную, я застрял с теми же основными проблемами - person Fabio Fantoni; 22.05.2015
comment
Я никогда не использовал широковещательные переменные (никогда не нуждался в этом). Но широковещательные переменные доступны только для чтения, поэтому, если вы используете их для хранения списка элементов, вы делаете это неправильно. Вы можете использовать аккумулятор, но вы не должны использовать его для коллекции, особенно если она того же размера или больше, чем ваш RDD. RDD предназначены для распределенных коллекций, поэтому используйте их для этого! - person The Archetypal Paul; 22.05.2015
comment
Спасибо за совет... Дело в том, что, пусть он и не такой искровой, я пытаюсь решить свою первоначальную задачу (своим кодом, а не вашим) добавлением аккумуляторов, но на самом деле не могу получить то, что мне нужно: Я всегда получаю частичные результаты из-за другого раздела, хотя я следую способу накопления, как предлагается здесь (stackoverflow.com/questions/30377182/) - person Fabio Fantoni; 22.05.2015
comment
Доктор, мне больно, когда я делаю это/Тогда не делай этого. - person The Archetypal Paul; 22.05.2015