doParallel и foreach не могут распараллелить операцию слияния

Я пытаюсь объединить большой data.frame с маленьким и распараллелить вычисления. Код ниже работает идеально, максимально используя все ядра моей машины:

len <- 2000000
set.seed(666)
dat = paste(sample(letters, len, rep = T), sample(0:9, len, rep = T), sample(letters, len, rep = T), sep = '') # create a vector of strings that are 3-long
head(dat)
set.seed(777)
num <- sample(0:9, len, replace = T)
bigDF <-  data.frame(dat = dat, num = num)
smallDF <- data.frame(num = 0:9, caps = toupper(letters[1:10]))
startP <- 1
chunk <- 10000
nodes <- detectCores()
cl <- makeCluster(nodes)
registerDoParallel(cl)
mergedList <- foreach(i = 0:(len/chunk - 1)) %dopar% {
    tmpDF = bigDF[(startP + i * chunk):(startP - 1 + (i + 1) * chunk), ]
    merge(tmpDF, smallDF, by = 'num', all.x = T)
}
stopCluster(cl)

Как только я изменяю вектор dat, чтобы он содержал строки длиной 5, параллелизм нарушается, и, хотя ошибок или предупреждений нет, только одно ядро ​​участвует в вычислениях:

len <- 2000000
set.seed(666)
dat = paste(sample(letters, len, rep = T), sample(0:9, len, rep = T), sample(letters, len, rep = T), sample(letters, len, rep = T), sample(letters, len, rep = T), sample(letters, len, rep = T), sep = '') # create a vector of strings that are 6-long
head(dat)
set.seed(777)
num <- sample(0:9, len, replace = T)
bigDF <-  data.frame(dat = dat, num = num)
smallDF <- data.frame(num = 0:9, caps = toupper(letters[1:10]))
startP <- 1
chunk <- 10000
nodes <- detectCores()
cl <- makeCluster(nodes)
registerDoParallel(cl)
mergedList <- foreach(i = 0:(len/chunk - 1)) %dopar% {
    tmpDF = bigDF[(startP + i * chunk):(startP - 1 + (i + 1) * chunk), ]
    merge(tmpDF, smallDF, by = 'num', all.x = T)
}
stopCluster(cl)

Почему это несоответствие и как его обойти? В конкретном примере, если индексировать dat в целые числа, код работает. Но индексация не является решением во всех случаях. Почему длина строк влияет на количество используемых ядер?


person Audrey    schedule 26.09.2014    source источник
comment
Создаются ли дочерние процессы R для нерабочих случаев? Как твоя свободная память?   -  person blindjesse    schedule 26.09.2014
comment
Похоже, я могу воспроизвести эту проблему на Win7-64bit, R3.1.1. Тонны свободной оперативной памяти; дети Rscript никогда не запускаются. Другие новости позже :-)   -  person Carl Witthoft    schedule 26.09.2014
comment
@blindJesse У меня есть гигабайты свободной оперативной памяти, так что проблема не в этом. @Carl: Мои системные характеристики идентичны вашим. Обратите внимание, что, как ни странно, если dat состоит из 4-длинных строк, то второе ядро ​​частично участвует в вычислениях. Для 5-длинных строк и выше работает только одно ядро.   -  person Audrey    schedule 29.09.2014


Ответы (2)


Я полагаю, что разница в том, что в первом случае первый столбец «bigDF» — это фактор с 6760 уровнями, а во втором — 1 983 234 уровня. Наличие огромного количества уровней может вызвать ряд проблем с производительностью. Когда я создал "bigDF" с помощью stringsAsFactors=FALSE, производительность была намного лучше.

bigDF <- data.frame(dat=dat, num=num, stringsAsFactors=FALSE)

Я также использовал функцию «isplitRows» из пакета itertools, чтобы избежать отправки всех «bigDF» каждому из воркеров:

library(itertools)
mergedList <- foreach(splitDF=isplitRows(bigDF, chunkSize=chunk)) %dopar% {
    merge(splitDF, smallDF, by = 'num', all.x = T)
}

На моем 6-ядерном компьютере с Linux под управлением R 3.1.1 ваш второй пример выполнялся примерно за 332 секунды. Когда я использовал stringsAsFactors=FALSE, он работал примерно за 50 секунд. Когда я также использовал isplitRows, время сократилось до 5,5 секунд, или примерно в 60 раз быстрее, чем во втором примере.

person Steve Weston    schedule 26.09.2014
comment
Это может быть частью этого, поскольку выполнение любого случая НЕ параллельно (с использованием %do% вместо %dopar%) почти не заняло времени на моей машине i7. Возможно, то, что занимает все время, — это выделение этих уровней фактора ведомым ядрам. Думаю, мы должны сделать профиль и попробовать еще раз, но преобразовать столбец в символ. - person Carl Witthoft; 27.09.2014
comment
Спасибо, Стив. iSplitRows определенно стоит внимания. Однако меня особенно интересует максимизация вычислительной мощности всех ядер, а не только сокращение системного времени. @CarlWitthoft: то же самое относится к% do%, что я сделал, и это быстрее. Персонажи действительно вычисляются быстрее, чем факторы, но все же развернуто только 1 ядро. - person Audrey; 29.09.2014
comment
Подтверждено — iSplitRows() — удобная функция, но она не влияет на количество ядер, задействованных в вычислениях. - person Audrey; 29.09.2014
comment
Странно: когда я пробую первоначальную настройку, но конвертирую классы bigDF$dat и smallDF$caps в класс character, самое большее я активирую два ядра. Когда у меня будет возможность, я попробую mclapply на них. - person Carl Witthoft; 29.09.2014
comment
@SteveWenston stringsAsCharacters = F, кажется, работает для меня, когда задействованы все ядра!? Полагаю, ограничено только стоимостью строк символов (в отличие от факторов) на ресурсах. - person Audrey; 30.09.2014

Еще не ответ, но: если я запускаю ваш код, но использую %do%, чтобы не распараллеливать, я получаю идентичные (успешные) результаты для двух случаев, за исключением, конечно, имен dat. То же самое, если я использую короткие имена с %dopar% и длинные имена с %do% .

Это начинает выглядеть как незаметная ошибка в одном из вспомогательных пакетов, так что вы можете проверить связь с разработчиками по этому поводу.

Обновление от 29 сентября: я запустил, как мне кажется, ту же настройку, но с использованием ClusterMap:

dffunc <-function(i=i,bigDF=bigDF,smallDF=smallDF,startP=startP,chunk=chunk) {
tmpDF <- bigDF[(startP + i * chunk):(startP - 1 + (i + 1) * chunk), ]
    merge(tmpDF, smallDF, by = 'num', all.x = T)
    }


clusmerge<- clusterMap(cl,  function(i) {dffunc(i=i)}, 0:(len/chunk-1),MoreArgs=list(bigDF=bigDF,smallDF=smallDF,startP=startP,chunk=chunk) )

И в этом случае я запускаю все узлы независимо от длины dat строк имени. Я снова подозреваю, что в %dopar% или где-то еще в пакете foreach есть какая-то ошибка.

В качестве примечания, могу ли я рекомендовать не делать

nodes <- detectCores()
cl <- makeCluster(nodes)

Так как это может повесить всю вашу машину. Лучше cl <- makeCluster(nodes-1) :-)

person Carl Witthoft    schedule 26.09.2014
comment
+1 за разумный makeCluster(nodes-1) :-). clusterMap() дает мне Error in checkForRemoteErrors(val). Просто запустите код с bigDF <- data.frame(dat = dat, num = num, stringsAsFactors = F), и все ядра, казалось, задействовались, как предлагает @SteveWeston, см. комментарий ниже. - person Audrey; 30.09.2014
comment
Я никогда не слышал о том, чтобы makeCluster(detectNodes()) повесил Linux или Mac. Поскольку мастер не выполняет никаких вычислений, может иметь смысл запускать по одному рабочему процессу на ядро, что mclapply сделал по умолчанию в многоядерном пакете. Вы хотите сказать, что он может повесить вызов makeCluster или последующую параллельную операцию? А вы видели зависания на чем-нибудь кроме винды? - person Steve Weston; 30.09.2014
comment
@SteveWeston Я слегка преувеличил: поскольку кластер Rscript использует 99,99% доступного ЦП, почти все остальное приостановлено, ожидая возможности получить несколько циклов. Да, машина возвращается в нормальное состояние, когда кластер готов, но тем временем большинству процессов приходится ждать, и ждать, и ждать... (напоминает «Кафе Рика») - person Carl Witthoft; 30.09.2014
comment
Я понимаю что ты имеешь ввиду. Использование 99,99% ядер в кластере или выделенной рабочей станции считается хорошей вещью, но на личном ноутбуке это раздражает. - person Steve Weston; 30.09.2014