Избегайте затрат на перераспределение при фильтрации и последующем объединении

Я реализую запрос диапазона в RDD из (x, y) точек в pyspark. Я разделил пространство xy на сетку 16 * 16 (256 ячеек) и назначил каждую точку в моем RDD одной из этих ячеек. gridMappedRDD — это PairRDD: (cell_id, Point object)

Я разделил этот RDD на 256 разделов, используя:

gridMappedRDD.partitionBy(256)

Запрос диапазона представляет собой прямоугольное поле. У меня есть метод для моего объекта Grid, который может возвращать список идентификаторов ячеек, которые перекрываются с диапазоном запроса. Итак, я использовал это как фильтр для обрезки несвязанных ячеек:

filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)

Но проблема в том, что при выполнении запроса и последующем сборе результатов оцениваются все 256 разделов; Задача создается для каждого раздела.

Чтобы избежать этой проблемы, я попытался объединить filteredRDD с длиной списка кандидатов в ячейки и надеялся, что это решит проблему.

filteredRDD.coalesce(len(candidateCells))

На самом деле результирующий RDD имеет len(candidateCells) разделов, но эти разделы не совпадают с gridMappedRDD.

Как указано в документации по объединению, параметр shuffle имеет значение False, и между разделами не должно выполняться перемешивание, но я вижу (с помощью glom()), что это не так.

Например, после coalesce(4) с candidateCells=[62, 63, 78, 79] разделы выглядят так:

[[(62, P), (62, P) .... , (63, P)],
 [(78, P), (78, P) .... , (79, P)],
 [], []
]

На самом деле, благодаря объединению у меня есть случайное чтение, равное размеру всего моего набора данных для каждой задачи, что занимает значительное время. Что мне нужно, так это RDD только с разделами, связанными с ячейками в ячейках-кандидатах, без каких-либо перетасовок. Итак, у меня вопрос, можно ли фильтровать только некоторые разделы без перетасовки? В приведенном выше примере мой filteredRDD будет иметь 4 раздела с точно такими же данными, как и 62, 63, 78, 79-й разделы originalRDD. При этом запрос может быть направлен только на затрагивающие разделы.


person user302787    schedule 03.05.2018    source источник


Ответы (1)


Здесь вы сделали несколько неверных предположений:

  • Перетасовка не связана с coalescecoalesce здесь не используется). Это вызвано partitionBy. Разделение по определению требует перемешивания.
  • Разделение нельзя использовать для оптимизации filter. Spark ничего не знает об используемой вами функции (это черный ящик).
  • Разделение не однозначно сопоставляет ключи с разделами. В один и тот же раздел можно поместить несколько ключей. Как работает HashPartitioner?

Что ты можешь сделать:

  • Если результирующее подмножество является небольшим перераспределением и применяется lookup для каждого ключа:

    from itertools import chain
    
    partitionedRDD = gridMappedRDD.partitionBy(256)
    
    chain.from_iterable(
        ((c, x) for x in partitionedRDD.lookup(c)) 
        for c in candidateCells
    )
    
  • Если данных много, можно попробовать не сканировать разделы (количество задач не изменится, но некоторые задачи могут быть закорочены):

    candidatePartitions = [
        partitionedRDD.partitioner.partitionFunc(c) for c in candidateCells
    ]
    
    partitionedRDD.mapPartitionsWithIndex(
        lambda i, xs: (x for x in xs if x[0] in candidateCells) if i in candidatePartitions else []
    )
    

Эти два метода имеют смысл только в том случае, если вы выполняете несколько «поисков». Если это разовая операция, лучше выполнить линейный фильтр:

  • Это дешевле, чем перемешивание и перераспределение.
  • Если исходные данные равномерно распределены, то последующая обработка сможет лучше использовать доступные ресурсы.
person Alper t. Turker    schedule 03.05.2018
comment
Большое спасибо за ответ. Проблема почти решена. Теперь каждый раз, когда я запускаю код, у меня 256 задач, но проверяются только необходимые разделы. - person user302787; 04.05.2018