Есть ли у appengine-mapreduce ограничение на количество операций?

Я работаю над проектом, который требует создания большой базы знаний на основе совпадений слов в тексте. Как я выяснил, подобный подход не применялся в appengine. Я хотел бы использовать гибкость и масштабируемость appengine, чтобы иметь возможность обслуживать базу знаний и рассуждать о ней для широкого круга пользователей.

До сих пор я придумал реализацию mapreduce на основе демонстрационного приложения для конвейера. Исходные тексты хранятся в blobstore в виде заархивированных файлов, содержащих один XML-документ, каждый из которых содержит переменное количество статей (до 30000).

Первым шагом было адаптировать текущий BlobstoreZipLineInputReader, чтобы он анализировал файл xml, извлекая из него соответствующую информацию. Класс XMLParser использует подход lxml iterparse для получения элементов xml для обработки с http://www.ibm.com/developerworks/xml/library/x-hiperfparse/ и возвращает итератор.

Модифицированный класс BlobstoreXMLZipLineInputReader имеет немного другую функцию next:

def next(self):
  if not self._filestream:
    if not self._zip:
      self._zip = zipfile.ZipFile(self._reader(self._blob_key))
      self._entries = self._zip.infolist()[self._start_file_index:
                                           self._end_file_index]
      self._entries.reverse()
    if not self._entries:
      raise StopIteration()
    entry = self._entries.pop()

    parser = XMLParser()
    # the result here is an iterator with the individual articles
    self._filestream = parser.parseXML(self._zip.open(entry.filename))

  try:
      article = self._filestream.next()
      self._article_index += 1    
  except StopIteration:
      article = None

  if not article:
    self._filestream.close()
    self._filestream = None
    self._start_file_index += 1
    self._initial_offset = 0
    return self.next()

  return ((self._blob_key, self._start_file_index, self._article_index),
          article)

Затем функция карты будет получать каждую из этих статей, разделенных на предложения, а затем разделенные на слова:

def map_function(data):
  """Word count map function."""
  (entry, article) = data
  for s in split_into_sentences(article.body):
    for w in split_into_words(s.lower()):
      if w not in STOPWORDS:
        yield (w, article.id)

Редуктор объединяет слова и объединяет идентификаторы статей, в которых они появляются:

def reduce_function(key, values):
  """Word count reduce function."""
  yield "%s: %s\n" % (key, list(set(values)))

Это прекрасно работает как на сервере разработки, так и на живой установке до 10000 текстов (на них не так много слов). Обычно это занимает не более 10 секунд. Проблема в том, что когда это немного превышает это, и кажется, что mapreduce постоянно зависает, обрабатывая задание. Количество обработанных элементов на сегмент просто увеличивается, и мои лимиты операций записи вскоре достигаются.

Q1. Есть ли какое-то ограничение на количество операций с картами, которые может выполнить конвейер mapreduce, прежде чем он начнет «плохо себя вести»?
Q2. Есть ли лучший подход к моей проблеме?
Q3. Я знаю, что об этом уже спрашивали, но могу ли я обойти запись временного хранилища данных mapreduce? они убивают меня...

P.S.: вот мой основной вызов mapreduce:

class XMLArticlePipeline(base_handler.PipelineBase):
  def run(self, filekey, blobkey):
    output = yield mapreduce_pipeline.MapreducePipeline(
        "process_xml",
        "backend.build_knowledgebase.map_function",
        "backend.build_knowledgebase.reduce_function",
        "backend.build_knowledgebase.BlobstoreXMLZipLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_keys": [blobkey],
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=12)
    yield StoreOutput(filekey, output)

РЕДАКТИРОВАТЬ.: Я получаю странные ошибки на сервере разработки при выполнении бесконечной работы:

[App Instance] [0] [dev_appserver_multiprocess.py:821] INFO Exception in HandleRequestThread
Traceback (most recent call last):
  File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 819, in run
    HandleRequestDirectly(request, client_address)
  File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 957, in HandleRequestDirectly
    HttpServer(), request, client_address)
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 310, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 323, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver.py", line 2579, in __init__
    BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 641, in __init__
    self.finish()
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 694, in finish
    self.wfile.flush()
  File "/usr/local/Cellar/python/2.7.2/lib/python2.7/socket.py", line 303, in flush
    self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 32] Broken pipe

person Francisco Roque    schedule 15.05.2012    source источник