doParallel (пакет) foreach не работает для больших итераций в R

Я запускаю следующий код (извлеченный из виньетки doParallel) на ПК (ОС Linux) с 4 и 8 физическими и логическими ядрами соответственно.

Запустив код с iter=1e+6 или меньше, все в порядке, и я вижу по загрузке ЦП, что все ядра используются для этого вычисления. Однако при большем количестве итераций (например, iter=4e+6) кажется, что в этом случае параллельные вычисления не работают. Когда я также отслеживаю использование ЦП, в вычислениях участвует только одно ядро ​​(100% загрузка).

Пример 1

require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]

Вы хоть представляете, в чем может быть причина? Может быть причина в памяти?

Я погуглил и нашел ЭТО имеет отношение к моему вопросу, но дело в том, что мне не сообщают о какой-либо ошибке, и ОП, похоже, нашел решение, предоставив необходимые пакеты внутри цикла foreach. Но, как видно, внутри моего цикла не используется никакой пакет.

ОБНОВЛЕНИЕ1

Моя проблема до сих пор не решена. Согласно моим экспериментам, я не думаю, что память может быть причиной. У меня есть 8 ГБ памяти в системе, в которой я запускаю следующую простую параллельную (по всем 8 логическим ядрам) итерацию:

Пример 2

require("doParallel")
require("foreach")

registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        i
    }
})[3]

У меня нет проблем с запуском этого кода, но когда я отслеживаю использование ЦП, только одно ядро ​​(из 8) составляет 100%.

ОБНОВЛЕНИЕ 2

Что касается Example2, @SteveWeston (спасибо за указание на это) заявил, что (в комментариях): «Пример в вашем обновлении страдает от крошечных задач. Только у мастера есть реальная работа, который состоит из отправки задач и обработки результатов. Это принципиально отличается от проблемы с исходным примером, который использовал несколько ядер при меньшем количестве итераций».

Однако Пример 1 по-прежнему остается нерешенным. Когда я его запускаю и слежу за процессами с htop, вот что происходит подробнее:

Назовем все 8 созданных процессов от p1 до p8. Статус (столбец S в htop) для p1 равен R, что означает, что он работает и остается неизменным. Однако для p2 до p8 через несколько минут статус меняется на D (т.е. непрерывный сон) и через несколько минут снова меняется на Z (т.е. завершен, но не получен его родителем). У вас есть идеи, почему это происходит?


person 989    schedule 10.06.2016    source источник
comment
Вы пытались явно создать и зарегистрировать кластер? Например cl <- makePSOCKcluster(8); registerDoParallel(cl)?   -  person Hack-R    schedule 10.06.2016
comment
Даже с iter=15e+6? Не могли бы вы прокомментировать характеристики вашего оборудования (процессор и память) и тип ОС, на которой вы запускаете код?   -  person 989    schedule 10.06.2016
comment
Да, я просто скопировал и вставил ваш код, и он работал, пока я его не убил. Я видел 8 процессов Rscript.exe в своем мониторе ресурсов, хотя они использовали лишь небольшую часть ядра ЦП, на котором работали (но это не обязательно проблема — он может колебаться из-за неэффективного разделения). Я запускал его на Windows Server 2008 с 24 физическими процессорами.   -  person Hack-R    schedule 10.06.2016
comment
Да, я также пытался создать и зарегистрировать кластеры, как вы указали, но безрезультатно. registerDoParallel() также подходит, как написано в виньетке, предоставленной авторами пакета. У меня такое ощущение, что это как-то связано с памятью.   -  person 989    schedule 10.06.2016
comment
Да, вероятно, это связано с памятью или вычислительными ресурсами. Так вам действительно нужно столько итераций? Если да, то можете ли вы добиться того же результата, разбив его на этапы с меньшим количеством итераций, а затем объединив (возможно, усреднив) результаты? Я не уверен, каков ваш вариант использования, но для машинного обучения вы часто можете сохранять прогресс модели в памяти хранения (т. е. контрольной точке), чтобы вы могли разбить обучение больших нейронных сетей и других моделей на несколько сеансов.   -  person Hack-R    schedule 10.06.2016
comment
Да, мне нужно. не так много, как 15e+6, но около 3.8e+6. На самом деле я пытался проверить на игрушечном примере (как показано выше), все ли идет хорошо, а затем провел свои эксперименты. В моих экспериментах мне нужно повторить foreach примерно 100 раз. Поэтому я думаю, что мне нужно разбить 100 повторений, а не foreach раз.   -  person 989    schedule 10.06.2016
comment
Я заставляю его запускать все 8 (после 10-15 секунд сортировки) в Windows с I7 4910, выполняющим приведенный выше код дословно.   -  person Bryan Goggin    schedule 10.06.2016
comment
Вы подтверждаете, что ваши работники зарегистрированы с getDoParRegistered() и getDoParWorkers()?   -  person Bryan Goggin    schedule 10.06.2016
comment
Да. За getDoParRegistered() я получаю TRUE и 8 за getDoParWorkers().   -  person 989    schedule 10.06.2016
comment
Пример в вашем обновлении страдает от крошечных задач. Только у мастера есть какая-то реальная работа, которая состоит в отправке задач и обработке результатов. Это принципиально отличается от проблемы с исходным примером, в котором использовалось несколько ядер при меньшем количестве итераций.   -  person Steve Weston    schedule 14.06.2016
comment
@SteveWeston Тогда, если это так, учитывая, что у меня есть 8 ГБ доступной памяти и с учетом размера объектов, созданных внутри и вне цикла, почему исходный пример не должен занимать все процессоры на 100%, например, для iter=3e+6?   -  person 989    schedule 14.06.2016
comment
@SteveWeston Вы можете снова проверить мое обновление.   -  person 989    schedule 14.06.2016
comment
Я начинаю думать, что для клиентских процессов есть тайм-аут. Это может быть в клиенте, или еще в чем-то сетевой безопасности, которая может через определенное время отключать нетипичные порты.   -  person EngrStudent    schedule 23.01.2019


Ответы (2)


Я думаю, у тебя мало памяти. Вот модифицированная версия этого примера, которая должна работать лучше, когда у вас много задач. Он использует doSNOW, а не doParallel, потому что doSNOW позволяет вам обрабатывать результаты с помощью функции комбинирования по мере того, как они возвращаются рабочими процессами. В этом примере эти результаты записываются в файл, чтобы использовать меньше памяти, однако он считывает результаты обратно в память в конце с помощью функции «.final», но вы можете пропустить это, если у вас недостаточно памяти.

library(doSNOW)
library(tcltk)
nw <- 4  # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)

x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000  # may require tuning for your machine
maxcomb <- nw + 1  # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)

comb <- function(fobj, ...) {
  for(r in list(...))
    writeBin(r, fobj)
  fobj
}

final <- function(fobj) {
  close(fobj)
  t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}

mkprogress <- function(total) {
  pb <- tkProgressBar(max=total,
                      label=sprintf('total tasks: %d', total))
  function(n, tag) {
    setTkProgressBar(pb, n,
      label=sprintf('last completed task: %d of %d', tag, total))
  }
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')

r <-
  foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
          .maxcombine=maxcomb, .init=resultFile, .final=final,
          .inorder=FALSE, .options.snow=opts) %dopar% {
    do.call('c', lapply(seq_len(n), function(i) {
      ind <- sample(100, 100, replace=TRUE)
      result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
      coefficients(result1)
    }))
  }

Я включил индикатор выполнения, так как выполнение этого примера занимает несколько часов.

Обратите внимание, что в этом примере также используется функция idiv из пакета iterators для увеличения объема работы в каждой из задач. Этот метод называется разбиением на фрагменты и часто повышает производительность параллельных вычислений. Однако использование idiv искажает индексы задач, поскольку переменная i теперь является индексом для каждой задачи, а не глобальным индексом. Для глобального индекса вы можете написать собственный итератор, который обертывает idiv:

idivix <- function(n, chunkSize) {
  i <- 1
  it <- idiv(n, chunkSize=chunkSize)
  nextEl <- function() {
    m <- nextElem(it)  # may throw 'StopIterator'
    value <- list(i=i, m=m)
    i <<- i + m
    value
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

Значения, выдаваемые этим итератором, представляют собой списки, каждый из которых содержит начальный индекс и счетчик. Вот простой цикл foreach, в котором используется этот пользовательский итератор:

r <- 
  foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
    do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
      i
    }))
  }

Конечно, если задачи требуют достаточно интенсивных вычислений, вам может не понадобиться разбиение на фрагменты, и вы можете использовать простой цикл foreach, как в исходном примере.

person Steve Weston    schedule 12.06.2016
comment
Спасибо за ваше решение, но когда я запускаю ваш код, я получаю: Warning: progress function failed: object 'pb' not found - person 989; 12.06.2016
comment
@ m0h3n Вы всегда можете отключить индикатор выполнения, не используя параметр foreach .options.snow. Определены ли pb и progress в глобальной среде? - person Steve Weston; 12.06.2016
comment
Да, я просто копирую/вставляю ваш код и запускаю его. Но посмотрите на функцию comb и чьи параметры и цикл for внутри. Это нормально? - person 989; 12.06.2016
comment
@ m0h3n Функция comb использует первый аргумент иначе, чем последующие аргументы, что требует использования аргумента foreach .init. Я включил примеры этого шаблона в качестве примеров в некоторые пакеты, связанные с foreach. - person Steve Weston; 12.06.2016
comment
Мне интересно, запускаете ли вы свой код перед публикацией здесь. Я получаю это: Error: object 'opts' not found. Пожалуйста, попробуйте очистить свою среду и запустить код, который вы разместили здесь. - person 989; 13.06.2016
comment
@ m0h3n Он работает на моем ноутбуке Mac и рабочем столе Linux, и у меня есть друг, чтобы проверить его, чтобы убедиться, что он работает не только у меня. Поскольку «opts» определяется после «mkprogress» и перед «resultFile», сообщение об ошибке, о котором вы сообщаете, не имеет для меня смысла. - person Steve Weston; 14.06.2016
comment
В ПОРЯДКЕ. мой плохой, извините. Поскольку я запускал ваш код через putty, я, видимо, из-за этого получал эту ошибку. Я попытался запустить его на другом компьютере (локальном), и теперь все в порядке. Как мы можем получить эти индексы (т.е. 1,2,3,4,...,niter), хранящиеся в r, в качестве конечного результата? Потому что в моих экспериментах мне нужно работать с этими индексами. Вопрос, изложенный здесь, является всего лишь игрушечным примером. - person 989; 14.06.2016
comment
Спасибо за ваше обновление. Вы также можете проверить мое обновление (исходный вопрос). - person 989; 14.06.2016

Сначала я подумал, что у вас проблемы с памятью, потому что при отправке многих задач действительно используется больше памяти, и это может в конечном итоге привести к зависанию главного процесса, поэтому мой первоначальный ответ показывает несколько методов использования меньшего объема памяти. Однако теперь это звучит так, как будто есть фаза запуска и остановки, когда занят только главный процесс, но рабочие заняты в течение некоторого периода времени в середине. Я думаю, проблема в том, что задачи в этом примере на самом деле не очень требовательны к вычислительным ресурсам, поэтому, когда у вас много задач, вы начинаете действительно замечать время запуска и завершения работы. Я рассчитал время реальных вычислений и обнаружил, что каждая задача занимает всего около 3 миллисекунд. В прошлом вы не получили бы никакой выгоды от параллельных вычислений с такими маленькими задачами, но теперь, в зависимости от вашей машины, вы можете получить некоторую выгоду, но накладные расходы значительны, поэтому, когда у вас очень много задач, вы действительно заметите, что накладные расходы.

Я все еще думаю, что мой другой ответ хорошо подходит для этой проблемы, но, поскольку у вас достаточно памяти, это излишне. Наиболее важная техника использования фрагментации. Вот пример, который использует фрагментацию с минимальными изменениями по сравнению с исходным примером:

require("doParallel")
nw <- 8
registerDoParallel(nw)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
niter <- 4e+6
r <- foreach(n=idiv(niter, chunks=nw), .combine='rbind') %dopar% {
  do.call('rbind', lapply(seq_len(n), function(i) {
    ind <- sample(100, 100, replace=TRUE)
    result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
    coefficients(result1)
  }))
}

Обратите внимание, что это немного отличается от моего другого ответа. Он использует только одну задачу для каждого работника с использованием параметра idiv chunks, а не параметра chunkSize. Это уменьшает объем работы, выполняемой мастером, и является хорошей стратегией, если у вас достаточно памяти.

person Steve Weston    schedule 15.06.2016
comment
Спасибо, проголосовал. Я скоро закончу этот вопрос. - person 989; 16.06.2016
comment
Если параллельный подход в Example1 не работает, потому что задачи не требуют интенсивных вычислений, то почему он работает (я имею в виду параллельный) для небольших итераций, а не для больших? - person 989; 16.06.2016