Я новичок в Haskell и подумал, что это будет хорошим упражнением. У меня есть задание, в котором мне нужно прочитать файл в потоке A, обработать строки файла в потоках B_i, а затем вывести результаты в потоке C.
Я уже реализовал это, но одно из требований заключается в том, что мы не можем быть уверены, что весь файл помещается в память. Я надеялся, что ленивый ввод-вывод и сборщик мусора сделают это за меня, но, увы, использование памяти продолжает расти и расти.
Поток чтения (A) читает файл с readFile
, который затем архивируется с номерами строк и запаковывается в Just. Эти заархивированные строки затем записываются в Control.Concurrent.Chan
. Каждый потребительский поток B имеет свой собственный канал.
Каждый потребитель считывает свой собственный канал, когда у него есть данные, и если регулярное выражение совпадает, оно выводится в их собственный соответствующий выходной канал, заключенный в Maybe (состоящий из списков).
Принтер проверяет выходной канал каждого из потоков B. Если ни один из результатов (строка) не равен Nothing, строка печатается. Так как в этот момент не должно быть ссылок на более старые строки, я думал, что сборщик мусора сможет освободить эти строки, но, увы, здесь я, похоже, не прав.
Файл .lhs находится здесь: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs
Итак, вопрос в том, как ограничить использование памяти или разрешить сборщику мусора удалять строки.
Фрагменты по запросу. Надеюсь, отступы не слишком сильно разрушены :)
data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a
producer :: [Input] -> FilePath -> State ()
producer c p = do
liftIO $ Main.log "Starting producer"
d <- asks done
f <- liftIO $ readFile p
mapM_ (\l -> mapM_
(liftIO . flip writeChan l) c)
$ zip [1..] $ map Just $ lines f
liftIO $ modifyMVar_ d (return . not)
printer :: State ()
printer = do
liftIO $ Main.log "Starting printer"
c <- (fmap (map (snd . snd) . M.elems)
(asks consumers >>= liftIO . readMVar))
uniq' c
where head' :: Output -> IO Line
head' ch = fmap head (readMVar ch)
tail' = mapM_ (liftIO . flip modifyMVar_
(return . tail))
cont ch = tail' ch >> uniq' ch
printMsg ch = readMVar (head ch) >>=
liftIO . putStrLn . fromJust . snd . head
cempty :: [Output] -> IO Bool
cempty ch = fmap (any id)
(mapM (fmap ((==) 0 . length) . readMVar ) ch)
{- Return false unless none are Nothing -}
uniq :: [Output] -> IO Bool
uniq ch = fmap (any id . map (isNothing . snd))
(mapM (liftIO . head') ch)
uniq' :: [Output] -> State ()
uniq' ch = do
d <- consumersDone
e <- liftIO $ cempty ch
if not e
then do
u <- liftIO $ uniq ch
if u then cont ch else do
liftIO $ printMsg ch
cont ch
else unless d $ uniq' ch