Дождитесь завершения n горутин

Мне нужно запустить огромное количество горутин и дождаться их завершения. Интуитивный способ, кажется, использует канал, чтобы дождаться, пока все они не будут завершены:

package main

type Object struct {
    //data
}

func (obj *Object) Update(channel chan int) {
    //update data
    channel <- 1
    return
}

func main() {

    channel := make(chan int, n)
    list := make([]Object, n, m)
    for {
        for _, object := range list {
            go object.Update(channel)
        }
        for i := 0; i < n; i++ {
            <-channel
        }
        //now everything has been updated. start again
    }
}

Но проблема в том, что количество объектов и, следовательно, количество горутин может меняться. Можно ли изменить размер буфера канала?

Может быть, есть более элегантный способ сделать это?


person lhk    schedule 16.05.2013    source источник
comment
Вы можете перераспределять его на каждой итерации, но вы можете посмотреть на WaitGroup.   -  person beatgammit    schedule 16.05.2013
comment
tjameson, спасибо за быструю помощь. Это выглядит действительно хорошо. Возможно, вы захотите сделать это ответом.   -  person lhk    schedule 16.05.2013
comment
Готово, с примером =D   -  person beatgammit    schedule 16.05.2013
comment


Ответы (3)


Я использовал WaitGroup как решение этой проблемы. Перевод вашего текущего кода с некоторыми журналами, чтобы было понятно, что происходит:

package main

import "sync"
import "fmt"
import "time"

type Object struct {
    //data
}

func (obj *Object) Update(wg *sync.WaitGroup) {
    //update data
    time.Sleep(time.Second)
    fmt.Println("Update done")
    wg.Done()
    return
}

func main() {
    var wg sync.WaitGroup
    list := make([]Object, 5)
    for {
        for _, object := range list {
            wg.Add(1)
            go object.Update(&wg)
        }
        //now everything has been updated. start again
        wg.Wait()
        fmt.Println("Group done")
    }
}
person beatgammit    schedule 16.05.2013
comment
Хороший ответ! Я бы, наверное, поставил defer wg.Done() в начале Update, хотя на тот случай, если функция разрастется и получит раннее возвращение в будущем. - person Nick Craig-Wood; 17.05.2013
comment
Или на случай паники или чего-то подобного. - person beatgammit; 18.05.2013

Эта задача не совсем тривиальна, довольно легко написать глючную. Рекомендую использовать готовое решение в stdlib - sync.WaitGroup. Цитата из ссылки:

Группа ожидания ожидает завершения набора горутин. Основная горутина вызывает Add, чтобы установить количество ожидаемых горутин. Затем запускается каждая горутина и по завершении вызывает Done. В то же время, Wait можно использовать для блокировки, пока не закончатся все горутины.

person zzzz    schedule 16.05.2013
comment
А если количество горутин для ожидания заранее неизвестно? - person Dfr; 24.04.2014
comment
@Dfr вы увеличиваете счетчик при запуске каждой горутины, поэтому это решение по-прежнему является лучшим решением, когда вы не знаете, сколько горутин вы будете запускать. - person Awn; 26.03.2017

@tjameson проделал отличную работу, объяснив, как использовать WaitGroup, как передать ссылку на ваш объект WaitGroup в вашу функцию. Единственное изменение, которое я бы внес в его пример, — это рычаг defer, когда вам Done. Я думаю, что это defer ws.Done() должно быть первым оператором в вашей функции.

Мне нравится простота WaitGroup. Однако мне не нравится, что нам нужно передавать ссылку на горутину, потому что это будет означать, что логика параллелизма будет смешана с вашей бизнес-логикой.

Поэтому я придумал эту общую функцию, чтобы решить эту проблему для меня:

// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
    var waitGroup sync.WaitGroup
    waitGroup.Add(len(functions))

    defer waitGroup.Wait()

    for _, function := range functions {
        go func(copy func()) {
            defer waitGroup.Done()
            copy()
        }(function)
    }
}

Итак, ваш пример можно решить следующим образом:

type Object struct {
    //data
}

func (obj *Object) Update() {
    //update data
    time.Sleep(time.Second)
    fmt.Println("Update done")
    return
}

func main() {
    functions := []func(){}
    list := make([]Object, 5)
    for _, object := range list {
        function := func(obj Object){ object.Update() }(object)
        functions = append(functions, function)
    }

    Parallelize(functions...)        

    fmt.Println("Group done")
}

Если вы хотите использовать его, вы можете найти его здесь https://github.com/shomali11/util

person Raed Shomali    schedule 09.06.2017