У меня есть входной CSV-файл с 4500 строками. Каждая строка имеет уникальный идентификатор, и для каждой строки я должен прочитать некоторые данные, выполнить некоторые вычисления и записать результат в файл csv, чтобы в моем каталоге вывода было записано 4500 файлов csv. Отдельный выходной CSV-файл содержит одну строку данных с 8 столбцами. Поскольку мне нужно выполнить одно и то же вычисление для каждой строки моего входного CSV-файла, я подумал, что могу распараллелить эту задачу, используя foreach
. Ниже приводится общая структура логики.
library(doSNOW)
library(foreach)
library(data.table)
input_csv <- fread('inputFile.csv'))
# to track the progres of the loop
iterations <- nrow(input_csv)
pb <- txtProgressBar(max = iterations, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
myClusters <- makeCluster(6)
registerDoSNOW(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.options.snow = opts) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
return(temp_result)
}
Приведенный выше код работает нормально, но всегда застревает / неактивен / ничего не делает после завершения 25% или 30% строк в input_csv
. Я все время смотрю в свой выходной каталог, что после N% итераций файл не записывается. Я подозреваю, что цикл foreach переходит в какой-то спящий режим? Что меня больше сбивает с толку, так это то, что если я завершаю задание, повторно запускаю приведенный выше код, он показывает 16% или 30%, а затем снова становится неактивным, т.е. с каждым новым запуском он спит на разных уровнях выполнения.
Я не могу понять, как привести минимально воспроизводимый пример в этом случае, но подумал, что если кто-нибудь знает какой-либо контрольный список, который я должен пройти, или потенциальные проблемы, которые вызывают это, были бы действительно полезны. Спасибо
ИЗМЕНИТЬ. Я все еще не могу решить эту проблему. Если я могу предоставить дополнительную информацию, пожалуйста, дайте мне знать.
РЕДАКТИРОВАТЬ2
Мой исходный inputFile
содержит 213164 строки. Поэтому я разделил свой большой файл на 46 файлов меньшего размера, чтобы в каждом файле было 4634 строки.
library(foreach)
library(data.table)
library(doParallel)
myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
Тогда я сделал это:
for(pr in 1:46){
input_csv <- myLs[[pr]]
myClusters <- parallel::makeCluster(6)
doParallel::registerDoParallel(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.verbose = TRUE) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
gc()
}
parallel::stopCluster(myClusters)
gc()
}
Это тоже работает до тех пор, пока, скажем, итерация pr = 7 или pr = 8, а затем не будет продолжена, а также не генерирует никаких сообщений об ошибках. Я так растерялся.
РЕДАКТИРОВАТЬ так выглядит загрузка моего ЦП. Я использовал только 4 ядра для создания этого изображения. Сможет ли кто-нибудь объяснить, есть ли на этом изображении что-нибудь, что могло бы ответить на мой вопрос.
temp_result
. Это проблема с памятью? - person F. Privé   schedule 28.07.2020fwrite()
, но похоже, что они удалили вопрос. Если я правильно помню, это было быстрее, например, для 50 файлов, но медленнее, например, для 500 файлов. Я не могу вспомнить величину разницы. Все это, чтобы сказать, возможно стоит попробовать поменятьfwrite()
наreadr::write_csv()
. Еще одна возможность заключается в том, что вы можете попытаться записать файлы на другом этапе, учитывая, что вы сохранили их все вresults
- person Andrew   schedule 06.08.2020readr
, но это тоже не сработало. Я также немного отредактировал свой вопрос, чтобы дать дополнительную информацию - person 89_Simple   schedule 06.08.2020gc()
попробуйтеrm(temp_result)
. Вы отслеживали использование памяти для отредактированного кода? В R этого делать не нужно, вы также можете использовать системные инструменты. У меня также было раньше, что сборщик мусора действительно не работал для параллельной обработки, поэтому это может быть немного сложно - person starja   schedule 06.08.2020sessionInfo()
и переменные среды для родительской среды и параллельной задачи? Я чувствую, что вы хотите попробовать запустить свой код вне RStudio. Почему? Если код запускается внутри RStudio, ваши системные вызовы будут проходить через RStudio и стать частью его памяти. Во-вторых, помните, что вы загружаете пакеты в каждой параллельной задаче. Является ли ваш пакетmyCustomPkg
параллельным безопасным? - person Technophobe01   schedule 09.08.2020# do some calculations
и# save the results as csv
. - person Technophobe01   schedule 09.08.2020