Ограничение использования памяти при чтении файлов

Я новичок в Haskell и подумал, что это будет хорошим упражнением. У меня есть задание, в котором мне нужно прочитать файл в потоке A, обработать строки файла в потоках B_i, а затем вывести результаты в потоке C.

Я уже реализовал это, но одно из требований заключается в том, что мы не можем быть уверены, что весь файл помещается в память. Я надеялся, что ленивый ввод-вывод и сборщик мусора сделают это за меня, но, увы, использование памяти продолжает расти и расти.

Поток чтения (A) читает файл с readFile, который затем архивируется с номерами строк и запаковывается в Just. Эти заархивированные строки затем записываются в Control.Concurrent.Chan. Каждый потребительский поток B имеет свой собственный канал.

Каждый потребитель считывает свой собственный канал, когда у него есть данные, и если регулярное выражение совпадает, оно выводится в их собственный соответствующий выходной канал, заключенный в Maybe (состоящий из списков).

Принтер проверяет выходной канал каждого из потоков B. Если ни один из результатов (строка) не равен Nothing, строка печатается. Так как в этот момент не должно быть ссылок на более старые строки, я думал, что сборщик мусора сможет освободить эти строки, но, увы, здесь я, похоже, не прав.

Файл .lhs находится здесь: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

Итак, вопрос в том, как ограничить использование памяти или разрешить сборщику мусора удалять строки.

Фрагменты по запросу. Надеюсь, отступы не слишком сильно разрушены :)

data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a


producer :: [Input] -> FilePath -> State ()
producer c p = do
  liftIO $ Main.log "Starting producer"
  d <- asks done
  f <- liftIO $ readFile p
  mapM_ (\l -> mapM_
    (liftIO . flip writeChan l) c)
    $ zip [1..] $ map Just $ lines f
  liftIO $ modifyMVar_ d (return . not)

printer :: State ()
printer = do
  liftIO $ Main.log "Starting printer"
  c <- (fmap (map (snd . snd) . M.elems)
    (asks consumers >>= liftIO . readMVar))
  uniq' c
  where head' :: Output -> IO Line
    head' ch = fmap head (readMVar ch)

    tail' = mapM_ (liftIO . flip modifyMVar_
        (return . tail))

    cont ch = tail' ch >> uniq' ch

    printMsg ch = readMVar (head ch) >>=
        liftIO . putStrLn . fromJust . snd . head

    cempty :: [Output] -> IO Bool
    cempty ch = fmap (any id)
        (mapM (fmap ((==) 0 . length) . readMVar ) ch)

    {- Return false unless none are Nothing -}
    uniq :: [Output] -> IO Bool
    uniq ch = fmap (any id . map (isNothing . snd))
        (mapM (liftIO . head') ch)

    uniq' :: [Output] -> State ()
    uniq' ch = do
      d <- consumersDone
      e <- liftIO $ cempty ch
      if not e
        then  do
          u <- liftIO $ uniq ch
          if u then cont ch else do
        liftIO $ printMsg ch
        cont ch
          else unless d $ uniq' ch

person Masse    schedule 19.09.2010    source источник


Ответы (1)


Параллельное программирование не предлагает определенного порядка выполнения, если вы сами не применяете его с помощью mvars и тому подобного. Так что вполне вероятно, что поток-производитель вставляет все/большинство строк в чан до того, как какой-либо потребитель прочитает их и передаст. Другая архитектура, которая должна соответствовать требованиям, состоит в том, что поток A вызывает ленивый файл чтения и вставляет результат в mvar. Затем каждый поток-потребитель берет mvar, читает строку, затем заменяет mvar, прежде чем приступить к обработке строки. Даже в этом случае, если выходной поток не справляется, количество совпадающих строк, хранящихся в канале, может увеличиваться произвольно.

У вас есть push-архитектура. Чтобы действительно заставить его работать в постоянном пространстве, думайте с точки зрения спроса. Найдите механизм, при котором выходной поток сигнализирует обрабатывающим потокам о том, что они должны что-то сделать, а обрабатывающие потоки сигнализируют потоку чтения о том, что они должны что-то сделать.

Другой способ сделать это состоит в том, чтобы вместо этого иметь каналы ограниченного размера, поэтому поток чтения блокируется, когда потоки процессора не догоняют, и поэтому потоки процессора блокируются, когда поток вывода не догоняет.

В целом проблема по сути напоминает мне бенчмарк широкоформатника Тима Брея, хотя требования несколько другие. В любом случае это привело к широкому обсуждению наилучшего способа реализации многоядерного grep. Основная изюминка заключалась в том, что проблема связана с вводом-выводом, и вам нужно несколько потоков чтения поверх mmapped-файлов.

Смотрите здесь больше, чем вы когда-либо хотели знать: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

person sclv    schedule 19.09.2010
comment
BoundedChan взломан именно для этого типа использования. - person Thomas M. DuBuisson; 19.09.2010
comment
Спасибо, Том и sciv. Я попробую реализовать его и отмечу как ответ, если он работает - person Masse; 20.09.2010