Как запретить ndb пакетировать вызов put_async() и заставить его немедленно выполнить RPC?

У меня есть обработчик запросов, который обновляет объект, сохраняет его в хранилище данных, а затем должен выполнить дополнительную работу перед возвратом (например, постановка в очередь фоновой задачи и json-сериализация некоторых результатов). Я хочу распараллелить этот код, чтобы дополнительная работа выполнялась во время сохранения объекта.

Вот к чему сводится мой код обработчика:

class FooHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def post(self):
        foo = yield Foo.get_by_id_async(some_id)

        # Do some work with foo

        # Don't yield, as I want to perform the code that follows
        # while foo is being saved to the datastore.
        # I'm in a toplevel, so the handler will not exit as long as
        # this async request is not finished.
        foo.put_async()

        taskqueue.add(...)
        json_result = generate_result()
        self.response.headers["Content-Type"] = "application/json; charset=UTF-8"
        self.response.write(json_result)

Однако Appstats показывает, что datastore.Put RPC выполняется последовательно после taskqueue.Add:

Скриншот статистики приложений

Небольшое копание в ndb.context.py показывает, что вызов put_async() в конечном итоге добавляется к AutoBatcher вместо немедленного выполнения RPC.

Итак, я предполагаю, что _put_batcher в конечном итоге сбрасывается, когда toplevel ожидает завершения всех асинхронных вызовов.

Я понимаю, что пакетная установка put имеет реальные преимущества в определенных сценариях, но в моем случае здесь я действительно хочу, чтобы RPC put был отправлен немедленно, чтобы я мог выполнять другую работу, пока сущность сохраняется.

Если я делаю yield foo.put_async(), то получаю тот же водопад в Appstats, но datastore.Put выполняется раньше остальных:

2-й скриншот статистики приложений

Этого следовало ожидать, так как yield заставляет мой обработчик ждать завершения вызова put_async() перед выполнением остального кода.

Я также пытался добавить вызов ndb.get_context().flush() сразу после foo.put_async(), но вызовы datastore.Put и taskqueue.BulkAdd по-прежнему не выполняются параллельно, согласно Appstats.

Итак, мой вопрос: как я могу заставить вызов put_async() обходить автоматический дозатор и немедленно выдавать RPC?


person Pascal Bourque    schedule 21.02.2013    source источник
comment
Это производство или местное?   -  person Lipis    schedule 21.02.2013


Ответы (2)


Нет поддерживаемого способа сделать это. Может и должно быть. Можете ли вы попробовать, если это работает?

loop - ndb.eventloop.get_event_loop()
while loop.run_idle():
    pass

Возможно, вам придется просмотреть исходный код ndb/eventloop.py, чтобы увидеть, что еще вы можете попробовать — в основном вы хотите попробовать большую часть того, что делает run0(), кроме ожидания RPC. В частности, возможно, вам придется сделать это:

while loop.current:
    loop.run0()
while loop.run_idle():
    pass

(Это все еще не поддерживается, потому что есть и другие условия, которые вам, возможно, придется обрабатывать, но они, похоже, не встречаются в вашем примере.)

person Guido van Rossum    schedule 22.02.2013
comment
Я заставил его работать, вызвав ndb.get_context().flush(), а затем 2 цикла, которые вы предложили сразу после моего вызова foo.put_async(). Я считаю, что должен быть официально поддерживаемый способ сделать это, так как я не думаю, что мой сценарий использования является необычным (сохранить объект, а затем завершить оставшуюся работу обработчика, пока объект сохраняется). Я подал запрос на эту функцию: code.google.com/p/ googleappengine/issues/detail?id=8863 - person Pascal Bourque; 22.02.2013
comment
Я думаю, что действительно необходимо, это taskqueue.add_async чтобы получить очередь задач rpc в циклы idle/rpc в цикле событий. code.google.com/p/appengine-ndb- эксперимент/проблемы/деталь?id=180 - person tesdal; 23.02.2013
comment
Какой именно код сработал у вас? flush() - это тасклет, поэтому вам придется выдать его, что может привести к большей задержке, чем вам хотелось бы. В любом случае, согласился, что это будет полезная функция. Возможно, это привлечет больше внимания, если вы зарегистрируете его в трекере NDB? - person Guido van Rossum; 24.02.2013
comment
Если put также включает в себя набор memcache, run_idle настроит только rpc memcache, а не rpc хранилища данных. RPC хранилища данных будет выполняться после добавления очереди задач. - person tesdal; 25.02.2013
comment
@GuidovanRossum, я отредактировал ваш ответ, указав подробности о том, что сработало для меня, но редактирование не было одобрено по некоторым причинам ... Как я сказал в предыдущем комментарии: вызов flush(), а затем выполнение двух циклов, которые вы предложили. - person Pascal Bourque; 25.02.2013
comment
И я отправил запрос функции (я предоставил ссылку в предыдущем комментарии). - person Pascal Bourque; 25.02.2013
comment
@tesdal Я согласен с тем, что для очереди задач требуется асинхронный API. Я уже пометил этот запрос функции. Что касается вашего комментария к кэшу памяти: в моем случае объект, сохраняемый асинхронно, не использует кэш памяти (я отключил политику кэширования памяти для этой модели). - person Pascal Bourque; 25.02.2013

Попробуйте это, я не уверен на 100%, что это поможет:

foo = yield Foo.get_by_id_async(some_id)
future = foo.put_async()
future.done()

Запросы ndb помещаются в автобатчер, пакет отправляется в RPC, когда вам нужен результат. Поскольку вам не нужен результат foo.put_async(), он не будет отправлен, пока вы не сделаете еще один вызов ndb (вы этого не сделаете) или пока не закончится @ndb.toplevel.

Вызов future.done() не блокируется, но я предполагаю, что это может вызвать запрос.

Еще одна вещь, чтобы попытаться заставить операцию:

ndb.get_context().flush()
person dragonx    schedule 21.02.2013
comment
Спасибо, но это ничего не делает. Future.done() делает только return self._done без какой-либо обработки, а Context.flush() я уже пробовал. - person Pascal Bourque; 21.02.2013