Как закрыть все потоки сервера grpc с помощью gracefulStop?

Я пытаюсь остановить всех клиентов, подключенных к потоковому серверу со стороны сервера. На самом деле я использую метод GracefulStop, чтобы справиться с этим изящно.

Я жду сигнала os.Interrupt на канале, чтобы выполнить плавную остановку для gRPC. но он застревает на server.GracefulStop() при подключении клиента.

func (s *Service) Subscribe(_ *empty.Empty, srv clientapi.ClientApi_SubscribeServer) error {
    ctx := srv.Context()

    updateCh := make(chan *clientapi.Update, 100)
    stopCh := make(chan bool)
    defer func() {
        stopCh<-true
        close(updateCh)
    }

    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer func() {
            ticker.Stop()
            close(stopCh)
        }
        for {
            select {
            case <-stopCh:
                return
            case <-ticker.C:
                updateCh<- &clientapi.Update{Name: "notification": Payload: "sample notification every 1 second"}
            }
        }
    }()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()

        case notif := <-updateCh:
            err := srv.Send(notif)
            if err == io.EOF {
                return nil
            }

            if err != nil {
                s.logger.Named("Subscribe").Error("error", zap.Error(err))
                continue
            }
        }
    }
}

Я ожидал, что context в методе ctx.Done() сможет справиться с этим и прервать цикл for. Как закрыть все потоки ответов, подобные этому?


person rezam    schedule 21.10.2019    source источник
comment
не могли бы вы воспроизвести и поделиться минимальным примером, пожалуйста?   -  person mh-cbon    schedule 21.10.2019
comment
@ mh-cbon Я обновил свой код   -  person rezam    schedule 21.10.2019


Ответы (2)


Создайте глобальный context для своей службы gRPC. Итак, пройдемся по различным частям:

  • Каждый запрос службы gRPC будет использовать этот контекст (вместе с контекстом клиента) для выполнения этого запроса.
  • os.Interrupt обработчик отменит глобальный контекст; таким образом отменяя все текущие запросы
  • наконец, выдать server.GracefulStop() - который должен дождаться завершения всех активных вызовов gRPC (если они не сразу увидят отмену)

Так, например, при настройке службы gRPC:

pctx := context.Background()
globalCtx, globalCancel := context.WithCancel(pctx)

mysrv := MyService{
    gctx: globalCtx
}

s := grpc.NewServer()
pb.RegisterMyService(s, mysrv)

os.Interrupt обработчик инициирует и ожидает завершения работы:

globalCancel()
server.GracefulStop()

Методы gRPC:

func(s *MyService) SomeRpcMethod(ctx context.Context, req *pb.Request) error {

    // merge client and server contexts into one `mctx`
    // (client context will cancel if client disconnects)
    // (server context will cancel if service Ctrl-C'ed)

    mctx, mcancel := mergeContext(ctx, s.gctx)

    defer mcancel() // so we don't leak, if neither client or server context cancels

    // RPC WORK GOES HERE
    // RPC WORK GOES HERE
    // RPC WORK GOES HERE

    // pass mctx to any blocking calls:
    // - http REST calls
    // - SQL queries etc.
    // - or if running a long loop; status check the context occasionally like so:

    // Example long request (10s)
    for i:=0; i<10*1000; i++ {
        time.Sleep(1*time.Milliscond)

        // poll merged context
        select {
            case <-mctx.Done():
                return fmt.Errorf("request canceled: %s", mctx.Err())
            default:
        }
    }
}

А также:

func mergeContext(a, b context.Context) (context.Context, context.CancelFunc) {
    mctx, mcancel := context.WithCancel(a) // will cancel if `a` cancels

    go func() {
        select {
        case <-mctx.Done(): // don't leak go-routine on clean gRPC run
        case <-b.Done():
            mcancel() // b canceled, so cancel mctx 
        }
    }()

    return mctx, mcancel
}
person colm.anseo    schedule 21.10.2019
comment
FWIW проверка s.gctx в верхней части обработчика службы не требуется. GracefulStop приведет к тому, что сервер полностью перестанет принимать новые запросы. - person Doug Fawley; 21.10.2019
comment
@DougFawley хорошее замечание. Обновленный ответ. Также исправлена ​​потенциальная утечка контекста в mergeContext. - person colm.anseo; 21.10.2019
comment
@colminator а что, если я не объединю контексты и не получу case для каждого? это вызывает утечку контекста? - person rezam; 22.10.2019
comment
Причина объединения их в единый контекст заключается в том, что вы можете передать единый контекст любому другому потенциально блокирующему вызову, например HTTP-запросу или SQL-запросу. Эти типы вызовов могут принимать только одно значение контекста. И вы хотите, чтобы эти вызовы обнаруживали любые отмены, которые может получить ваш RPC - клиент или сервер. - person colm.anseo; 22.10.2019
comment
@colminator Я думаю, что ваш выбор внутри горутины в mergeContext также нуждается в футляре для mctx.Done(). В противном случае эта горутина будет протекать даже при вызове mcancel. - person Doug Fawley; 22.10.2019
comment
@DougFawley Я думаю, что контекст клиента будет восстановлен при вызовах, которые вызовут очистку mctx. Но на всякий случай обновлю ваше предложение. Я также немного сжал его - как я заметил, я могу сделать a родительским контекстом для mctx - и тогда нужно будет только отслеживать b (и, конечно, mctx). - person colm.anseo; 22.10.2019
comment
@colminator - вы правы в отношении отмены контекста обработчика службы, но если mergeContext предназначена для использования в качестве общей функции (для использования вне gRPC), то, возможно, дочерние элементы никогда не будут отменены. Обновление выглядит неплохо, и идея сделать mctx дочерним элементом a. С этим изменением вы можете обновить обработчик службы, чтобы он выполнял: ctx, cancel := mergeContext(ctx, s.gctx), чтобы избежать ошибочной блокировки непосредственно для неслитого ctx. (Любые значения в ctx также будут доступны через mctx.) - person Doug Fawley; 24.10.2019

Обычно клиенты должны предполагать, что RPC могут завершиться (например, из-за ошибок подключения или сбоя питания сервера) в любой момент. Итак, что мы делаем - это GracefulStop, бездействуем на короткий период времени, чтобы позволить RPC в полете завершиться естественным образом, а затем жестко Stop сервер. Если вам действительно нужно использовать этот сигнал завершения для завершения ваших RPC, то ответ от @colminator, вероятно, будет лучшим выбором. Но эта ситуация должна быть необычной, и вы можете потратить некоторое время на анализ своего проекта, если обнаружите, что необходимо вручную завершить потоковую передачу RPC при выключении сервера.

person Doug Fawley    schedule 23.10.2019