Я реализую запрос диапазона в RDD из (x, y) точек в pyspark. Я разделил пространство xy на сетку 16 * 16 (256 ячеек) и назначил каждую точку в моем RDD одной из этих ячеек. gridMappedRDD — это PairRDD: (cell_id, Point object)
Я разделил этот RDD на 256 разделов, используя:
gridMappedRDD.partitionBy(256)
Запрос диапазона представляет собой прямоугольное поле. У меня есть метод для моего объекта Grid, который может возвращать список идентификаторов ячеек, которые перекрываются с диапазоном запроса. Итак, я использовал это как фильтр для обрезки несвязанных ячеек:
filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)
Но проблема в том, что при выполнении запроса и последующем сборе результатов оцениваются все 256 разделов; Задача создается для каждого раздела.
Чтобы избежать этой проблемы, я попытался объединить filteredRDD с длиной списка кандидатов в ячейки и надеялся, что это решит проблему.
filteredRDD.coalesce(len(candidateCells))
На самом деле результирующий RDD имеет len(candidateCells)
разделов, но эти разделы не совпадают с gridMappedRDD
.
Как указано в документации по объединению, параметр shuffle
имеет значение False, и между разделами не должно выполняться перемешивание, но я вижу (с помощью glom()), что это не так.
Например, после coalesce(4)
с candidateCells=[62, 63, 78, 79]
разделы выглядят так:
[[(62, P), (62, P) .... , (63, P)],
[(78, P), (78, P) .... , (79, P)],
[], []
]
На самом деле, благодаря объединению у меня есть случайное чтение, равное размеру всего моего набора данных для каждой задачи, что занимает значительное время. Что мне нужно, так это RDD только с разделами, связанными с ячейками в ячейках-кандидатах, без каких-либо перетасовок. Итак, у меня вопрос, можно ли фильтровать только некоторые разделы без перетасовки? В приведенном выше примере мой filteredRDD будет иметь 4 раздела с точно такими же данными, как и 62, 63, 78, 79-й разделы originalRDD. При этом запрос может быть направлен только на затрагивающие разделы.