цикл foreach становится неактивным для больших итераций в R

У меня есть входной CSV-файл с 4500 строками. Каждая строка имеет уникальный идентификатор, и для каждой строки я должен прочитать некоторые данные, выполнить некоторые вычисления и записать результат в файл csv, чтобы в моем каталоге вывода было записано 4500 файлов csv. Отдельный выходной CSV-файл содержит одну строку данных с 8 столбцами. Поскольку мне нужно выполнить одно и то же вычисление для каждой строки моего входного CSV-файла, я подумал, что могу распараллелить эту задачу, используя foreach. Ниже приводится общая структура логики.

 library(doSNOW)
 library(foreach)
 library(data.table)
  
 input_csv <- fread('inputFile.csv')) 

 # to track the progres of the loop
 iterations <- nrow(input_csv)
 pb <- txtProgressBar(max = iterations, style = 3)
 progress <- function(n) setTxtProgressBar(pb, n)
 opts <- list(progress = progress)

 myClusters <- makeCluster(6)
 registerDoSNOW(myClusters)

 results <- 

     foreach(i = 1:nrow(input_csv), 
     .packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
     .errorhandling = 'remove',
     .options.snow = opts) %dopar% 
      
  {
        
       rowRef <- input_csv[i, ]
        
       # read data for the unique location in `rowRef`  
         weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))

       # do some calculations
        
       # save the results as csv
        fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
        
       return(temp_result)
 }
  

Приведенный выше код работает нормально, но всегда застревает / неактивен / ничего не делает после завершения 25% или 30% строк в input_csv. Я все время смотрю в свой выходной каталог, что после N% итераций файл не записывается. Я подозреваю, что цикл foreach переходит в какой-то спящий режим? Что меня больше сбивает с толку, так это то, что если я завершаю задание, повторно запускаю приведенный выше код, он показывает 16% или 30%, а затем снова становится неактивным, т.е. с каждым новым запуском он спит на разных уровнях выполнения.

Я не могу понять, как привести минимально воспроизводимый пример в этом случае, но подумал, что если кто-нибудь знает какой-либо контрольный список, который я должен пройти, или потенциальные проблемы, которые вызывают это, были бы действительно полезны. Спасибо

ИЗМЕНИТЬ. Я все еще не могу решить эту проблему. Если я могу предоставить дополнительную информацию, пожалуйста, дайте мне знать.

РЕДАКТИРОВАТЬ2
Мой исходный inputFile содержит 213164 строки. Поэтому я разделил свой большой файл на 46 файлов меньшего размера, чтобы в каждом файле было 4634 строки.

 library(foreach)
 library(data.table)
 library(doParallel)

myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
 

Тогда я сделал это:

for(pr in 1:46){

    input_csv <- myLs[[pr]]

    myClusters <- parallel::makeCluster(6)
    doParallel::registerDoParallel(myClusters)


 results <- 

  foreach(i = 1:nrow(input_csv), 
 .packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
 .errorhandling = 'remove',
 .verbose = TRUE) %dopar% 

 {

   rowRef <- input_csv[i, ]

   # read data for the unique location in `rowRef`  
     weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))

   # do some calculations

   # save the results as csv
    fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
    gc()
 }

 parallel::stopCluster(myClusters)
 gc()
 }

Это тоже работает до тех пор, пока, скажем, итерация pr = 7 или pr = 8, а затем не будет продолжена, а также не генерирует никаких сообщений об ошибках. Я так растерялся.

РЕДАКТИРОВАТЬ так выглядит загрузка моего ЦП. Я использовал только 4 ядра для создания этого изображения. Сможет ли кто-нибудь объяснить, есть ли на этом изображении что-нибудь, что могло бы ответить на мой вопрос.

введите описание изображения здесь


person 89_Simple    schedule 28.07.2020    source источник
comment
Похоже, ты возвращаешься temp_result. Это проблема с памятью?   -  person F. Privé    schedule 28.07.2020
comment
да, я возвращаю temp_result. Есть ли способ проверить, действительно ли это вызвано проблемой с памятью, поскольку ошибки не возникает. Сценарий просто останавливается на 25%, 30% или 10% и не перемещается. Если я убью задание, ошибка все равно не появится.   -  person 89_Simple    schedule 28.07.2020
comment
Вы должны открыть какой-то системный монитор.   -  person F. Privé    schedule 28.07.2020
comment
Пару месяцев назад у кого-то были проблемы с экспортом тонны файлов, и они также использовали fwrite(), но похоже, что они удалили вопрос. Если я правильно помню, это было быстрее, например, для 50 файлов, но медленнее, например, для 500 файлов. Я не могу вспомнить величину разницы. Все это, чтобы сказать, возможно стоит попробовать поменять fwrite() на readr::write_csv(). Еще одна возможность заключается в том, что вы можете попытаться записать файлы на другом этапе, учитывая, что вы сохранили их все в results   -  person Andrew    schedule 06.08.2020
comment
Хорошо. Спасибо за ваш комментарий. Я прочитаю функцию чтения и проверю, помогает ли она   -  person 89_Simple    schedule 06.08.2020
comment
Я пробовал использовать readr, но это тоже не сработало. Я также немного отредактировал свой вопрос, чтобы дать дополнительную информацию   -  person 89_Simple    schedule 06.08.2020
comment
В строке перед gc() попробуйте rm(temp_result). Вы отслеживали использование памяти для отредактированного кода? В R этого делать не нужно, вы также можете использовать системные инструменты. У меня также было раньше, что сборщик мусора действительно не работал для параллельной обработки, поэтому это может быть немного сложно   -  person starja    schedule 06.08.2020
comment
Я добавил образ системных инструментов, чтобы показать использование памяти   -  person 89_Simple    schedule 07.08.2020
comment
@ 89_Simple Можете ли вы поделиться информацией о настройке машины? sessionInfo() и переменные среды для родительской среды и параллельной задачи? Я чувствую, что вы хотите попробовать запустить свой код вне RStudio. Почему? Если код запускается внутри RStudio, ваши системные вызовы будут проходить через RStudio и стать частью его памяти. Во-вторых, помните, что вы загружаете пакеты в каждой параллельной задаче. Является ли ваш пакет myCustomPkg параллельным безопасным?   -  person Technophobe01    schedule 09.08.2020
comment
@ 89_Simple В дополнение к вышесказанному, я считаю, что вы не можете экспортировать переменную в каждый параллельный экземпляр. Использует ли какой-либо ваш код глобальную или локальную переменную среды, которая не передается в каждую параллельную задачу? Надеюсь, что это поможет.   -  person Technophobe01    schedule 09.08.2020
comment
@ 89_Simple И последнее наблюдение, на мой взгляд, вам может понадобиться посмотреть, как lock () ваши записи в параллельных задачах. Сложно сказать без воспроизводимого примера, но это похоже на то, что может вызвать проблемы, особенно в свете # do some calculations и # save the results as csv.   -  person Technophobe01    schedule 09.08.2020
comment
Вы должны отслеживать эволюцию вашей оперативной памяти. У меня была очень похожая проблема с рабочими, которые засыпали по мере выполнения задачи. Я узнал, что они действительно были убиты, когда память достигла 100%, пока не осталось ни одной.   -  person David Gauthier    schedule 12.06.2021


Ответы (3)


Вы можете использовать пакет progressr, чтобы следовать -up использование памяти в интерактивном режиме.
Например, с пакетом furrr:

library(furrr)
library(pryr)
plan(multisession,workers=6)

library(progressr)
handlers("progress")

#input_csv <- fread('inputFile.csv')) 
#filesID <- as.list(1:nrow(input_csv))
filesID <- as.list(1:12)

with_progress({
  p <- progressor(along = filesID)
  result <- future_map(filesID, function(fileID) {
    #rowRef <- input_csv[fileID, ]
    
    # read data for the unique location in `rowRef`  
    #weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))
  
  # do some calculations : simulate memory increase
  temp_result <- rnorm(2e7)
  # save the results as csv
  #fwrite(temp_result, file.path(paste0('output_iter_',fileID,'.csv')))
  
  Sys.sleep(2)
  p(sprintf("memory used=%g", pryr::mem_used()))
  
  
  return(object.size(temp_result))
  },.options=future_options(packages=c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr")))
})

[====================================================>-------]  90% memory used=6.75075e+08

Тот же метод применяется к foreach.

Другое предложение - не возвращать результаты в основной процесс, поскольку вы уже сохраняете их в файле. Вместо return(temp_result) вы можете вывести сводку, например object.size, зная, что полные результаты можно найти в связанном файле.

person Waldi    schedule 03.08.2020
comment
быстрый вопрос: какова цель Sys.sleep(3) в вашем коде? - person 89_Simple; 04.08.2020
comment
Просто чтобы было достаточно времени, чтобы увидеть индикатор выполнения, поскольку мой код не обрабатывает данные - person Waldi; 04.08.2020
comment
@ 89_Simple, помогло ли это получить больше информации о причине неожиданного зависания? furrr в порядке или вы предпочитаете использовать foreach / doSNOW? - person Waldi; 05.08.2020
comment
Сейчас я это тестирую. Приносим извинения за задержку. Я скоро вернусь к вам с дополнительной информацией - person 89_Simple; 06.08.2020
comment
Я предпочитаю doSNOW, поскольку это то, с чем я знаком. Я пытаюсь запустить ваш пример с прошлой ночи для моих фактических данных, которые содержат около 200000 строк, а также я записываю отдельные temp_result, а не возвращаюсь в виде списка. Кажется, теперь на это уходит много времени. - person 89_Simple; 06.08.2020
comment
Также при запуске вашего решения, поскольку теперь я не возвращаю никакого temp_result, а скорее записываю его, я вижу, что память потребляется только на 0–1%, но со временем она выросла до 32%. Я не мог понять это поведение - person 89_Simple; 06.08.2020
comment
Увеличение памяти - интересная информация, и 32% по-прежнему обнадеживают: на этот раз можно было бы добиться успеха, но никогда бы не удалось без обобщения результатов. Вы проверяли, работает ли ожидаемое количество ядер? Как далеко находится индикатор выполнения? Перед запуском операции 200000 строк я бы сделал некоторую статистику для меньшего подмножества (возможно, 1000 строк), чтобы убедиться, что все рабочие работают и работают, и получить среднее время расчета для одной строки, чтобы оценить время полного расчета. - person Waldi; 06.08.2020
comment
Вы также можете попробовать запустить gc() в конце каждого вычисления, чтобы проверить, помогает ли сборка мусора уменьшить увеличение памяти. - person Waldi; 06.08.2020

Из вашего кода не совсем возможно понять, почему он должен остановиться. Может быть, некоторые части вашего foreach цикла не являются потокобезопасными (например, data.table использует многопоточность для подмножества)?

В настоящее время очень мало что нужно изменить, чтобы помочь, и ответ @Walldi, вероятно, хорош для диагностики реальной проблемы. Единственное, что кажется очевидным для изменения здесь, - это избегать повторения отдельных строк вашего data.frame за счет использования внутренних функций foreach.

foreach выполняет параллельное программирование путем создания итератора над объектом. При параллельном программировании между каждой итерацией возникают некоторые накладные расходы, поскольку потоку / ядру потребуется запрашивать новую информацию. По сути, это выгодно минимизировать эти накладные расходы за счет минимизации количества итераций. Мы можем сделать это, разделив наш набор данных на части или вручную создав итератор с помощью пакета iterators.
У меня нет доступа к вашим данным, поэтому ниже приведен воспроизводимый пример с использованием набора данных mtcars. Я разделил его на блоки setup и foreach для облегчения чтения. Обратите внимание, что files в моем примере является простым вектором, поэтому требуется некоторое минимальное изменение для фактического кода, показанного в вопросе, поскольку files внутри цикла foreach теперь становится data.frame, а не вектором.

Настраивать

library(iterators)
library(foreach)
library(data.table)
library(arrow)
library(doParallel)
# Set up reproducible example:
data(mtcars)
files <- replicate(100, tempfile())
lapply(files, function(x)write_parquet(mtcars, x))

# Split the files into chunks for the iterator
nc <- parallel::detectCores()
sfiles <- split(files, seq_len(length(files)) %% nc + 1)
# Set up backend
th <- parallel::makeCluster(nc)
registerDoParallel(th)

Для каждого

foreach(files = sfiles, #Note the iterator will name each chunk 'files' within the loop. 
        .packages = c('data.table', 'arrow', 'dplyr'), 
        .combine = c, # Because I return the resulting file names
        .multicombine = TRUE) %dopar% {
  # Iterate over each chunk within foreach
  # Reduces loop overhead
  outF <- character(length(files))
  for(i in seq_along(files)){
    tib <- arrow::read_parquet(files[i])
    # Do some stuff
    tib <- tib %>% select(mpg, hp)
    # Save output
    outF[i] <- tempfile(fileext = '.csv')
    fwrite(tib, outF[i])
  }
  # Return list of output files
  return(outF)
}

Я не верю, что это решит проблему, но это может немного снизить ваши накладные расходы.

person Oliver    schedule 09.08.2020

Вам нужно отвлечься от каждого файлового цикла, поскольку проблема не в этом. Проблема связана с обработкой содержимого в файле. Проблема в том, что когда вы пытаетесь создать файл для каждой строки, вы не фиксируете запись после каждой строки, и поэтому весь процесс для одного файла и строки за строкой накапливается в памяти. Вам нужно очистить память при записи файла и закрыть соединение.

Если возможно, попробуйте использовать приложение, как показано в приведенном ниже примере.

Для каждой строки в кадре данных R

Попробуйте закрыть соединение с файлом, как написано в Справочнике ниже:

https://stat.ethz.ch/R-manual/R-devel/library/base/html/connections.html.

person A Modgil    schedule 10.08.2020