GoLang: распаковать bz2 в горутине, использовать в другой горутине

Я новичок в SWE, изучаю Go (и люблю его).

Я создаю парсер для файлов дампа Википедии - в основном огромный XML-файл, сжатый bzip2 (~ 50 ГБ без сжатия).

Я хочу выполнять как потоковую декомпрессию, так и синтаксический анализ, что звучит достаточно просто. Для декомпрессии я делаю:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

А затем передайте считыватель парсеру XML:

decoder := xml.NewDecoder(inputFile)

Однако, поскольку распаковка и синтаксический анализ являются дорогостоящими операциями, я хотел бы, чтобы они выполнялись в отдельных подпрограммах Go, чтобы использовать дополнительные ядра. Как мне это сделать в Go?

Единственное, о чем я могу думать, это обернуть файл в chan []byte и реализовать интерфейс io.Reader, но я предполагаю, что может быть встроенный (и более чистый) способ сделать это.

Кто-нибудь когда-нибудь делал что-то подобное?

Спасибо! Мануэль


person Manuel Menzella    schedule 25.03.2016    source источник


Ответы (2)


Вы можете использовать io.Pipe, а затем использовать io.Copy, чтобы поместить распакованные данные в конвейер и прочитать их в другой горутине:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "sync"
)

func main() {

    rawJson := []byte(`{
            "Foo": {
                "Bar": "Baz"
            }
        }`)

    bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader

    var wg sync.WaitGroup
    wg.Add(2)

    r, w := io.Pipe()

    go func() {
        // write everything into the pipe. Decompression happens in this goroutine.
        io.Copy(w, bzip2Reader)
        w.Close()
        wg.Done()
    }()

    decoder := json.NewDecoder(r)

    go func() {
        for {
            t, err := decoder.Token()
            if err != nil {
                break
            }
            fmt.Println(t)
        }
        wg.Done()
    }()

    wg.Wait()
}

http://play.golang.org/p/fXLnfnaWYA

person user1431317    schedule 25.03.2016
comment
Это именно то, что мне было нужно, спасибо! К сожалению, кажется, что производительность декомпрессора stardard lib bzip2 не велика, так что это все еще ограничивающий фактор. Я могу переключиться на этот компрессор: godoc.org/github.com/dsnet/compress/bzip2 Однако он все еще примерно на 33% медленнее, чем что-то вроде pbzip2. - person Manuel Menzella; 26.03.2016
comment
Насколько сильно ты ускорился в итоге, @ManuelMenzella? Мне нравится внешний вид этого кода — кажется, что он должен работать, но в моем тестировании он лишь незначительно быстрее, чем выполнение всего однопоточного (67 секунд против 72 секунд для 1M записей). Есть идеи, что я мог делать неправильно, @user1431317? - person EM0; 12.07.2017
comment
Возможно, он все еще ограничен тем, насколько быстро распаковка bzip2 может передавать данные, а декодирование xml не требует такой большой мощности процессора. Канал, вероятно, добавляет некоторые накладные расходы, хотя io.Copy имеет оптимизацию для случаев, когда один или оба конца являются io.Reader/io.Writer. Возможно, он выделяет много небольших временных буферов, и это вызывает слишком много мусора. Может быть, поможет буферизованный ридер или писатель. Вы должны профилировать свое приложение (как процессор, так и профиль памяти - профиль памяти может помочь вам найти много ненужных распределений). - person user1431317; 13.07.2017
comment
Декомпрессор и парсер bzip2 занимают примерно одинаковое количество времени — я проверил это, запустив их по отдельности. Я также пытался буферизовать все, что только можно вообразить, и это не помогло. Ну, все, кроме io.Pipe, и я подозреваю, что это могло быть проблемой. Я разместил отдельный вопрос об этом: stackoverflow.com/questions/45089248/ - person EM0; 13.07.2017
comment
Буферизация PipeReader и PipeWriter также не помогла. Хм... Пробовали ли вы (или кто-либо другой) это и обнаружили значительное ускорение по сравнению с однопоточной версией? - person EM0; 13.07.2017
comment
io.Copy фактически использует буфер размером 32 КБ. Пока этот буфер не заполнится, он будет заблокирован в ожидании чтения bzip2. Возможно, проблема в том, что этот буфер слишком велик. Вы хотите передать декодеру xml данные, как только они станут доступны. Вот смоделированный пример, который использует крошечный буфер и на самом деле показывает значительное улучшение: play.golang.org/p/ lkwwBBR8jx Конечно, вы не хотите, чтобы буфер был слишком маленьким, потому что это добавляет слишком много накладных расходов (слишком частое переключение между горутинами). В примере используется крошечный json, поэтому вам придется настроить размер буфера в соответствии с вашими фактическими данными. - person user1431317; 14.07.2017
comment
Я пробовал это, но меньший буфер делает его медленнее. 32K кажется оптимальным. Интересно, что пакет стандартной библиотеки bzip2 работает медленнее, чем пакет dsnet в одном потоке, но быстрее при использовании двух горутин! stdlib занимает 44,6 секунды в однопоточном режиме, 34,6 секунды с горутинами. dsnet занимает 40,7 секунды в однопоточном режиме, 38,6 секунды с горутинами. (Проверено это несколько раз.) - person EM0; 14.07.2017

Простое решение — использовать пакет readahead, который я создал некоторое время назад: https://github.com/klauspost/readahead

inputReader := bzip2.NewReader(inputFile)
ra := readahead.NewReader(input)
defer ra.Close()

А затем передайте считыватель парсеру XML:

decoder := xml.NewDecoder(ra)

С настройками по умолчанию он будет декодировать до 4 МБ заранее в 4 буфера.

person sh0dan    schedule 11.05.2019