BrokenPipeError: [Errno 32] Сломанный канал Neo4j и Python с использованием многопроцессорной обработки

Я пытаюсь использовать neo4j с помощью драйвера python. Реализована программа, которая часто обменивается данными между neo4j и Python, и каждая из итераций программы независима. Программа работает отлично, когда не используется параллельная обработка. Затем я пытаюсь реализовать это, используя параллельную обработку в python, где я распараллеливаю эти независимые итерации. У меня машина с 24 ядрами. Таким образом, я могу запустить довольно много процессов. Даже при параллельном выполнении программа выполняется до тех пор, пока количество процессов не станет равным 5. Для любого числа, превышающего это 90% раз, я получаю следующую ошибку.

---------------------------------------------------------------------------
RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 828, in close
    self.sync()
  File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 793, in sync
    self.session.sync()
  File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 538, in sync
    detail_count, _ = self._connection.sync()
  File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 526, in sync
    self.send()
  File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 388, in send
    self._send()
  File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 408, in _send
    self.socket.sendall(data)
  File "/usr/local/lib/python3.7/ssl.py", line 1015, in sendall
    v = self.send(byte_view[count:])
  File "/usr/local/lib/python3.7/ssl.py", line 984, in send
    return self._sslobj.write(data)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "<ipython-input-4-616f793afd51>", line 9, in func
    d2=run_query(streaming_query)
  File "<ipython-input-2-01a2f4205218>", line 6, in run_query
    result = session.read_transaction(lambda tx: tx.run(streaming_query))
  File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 710, in read_transaction
    return self._run_transaction(READ_ACCESS, unit_of_work, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 686, in _run_transaction
    tx.close()
  File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 835, in close
    self.session.commit_transaction()
  File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 630, in commit_transaction
    self._connection.sync()
  File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 526, in sync
    self.send()
  File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 388, in send
    self._send()
  File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 408, in _send
    self.socket.sendall(data)
  File "/usr/local/lib/python3.7/ssl.py", line 1015, in sendall
    v = self.send(byte_view[count:])
  File "/usr/local/lib/python3.7/ssl.py", line 984, in send
    return self._sslobj.write(data)
BrokenPipeError: [Errno 32] Broken pipe
"""

The above exception was the direct cause of the following exception:

BrokenPipeError                           Traceback (most recent call last)
<ipython-input-5-da15b33c8ad4> in <module>
      7 pool = multiprocessing.Pool(processes=num_processes)
      8 start = time.time()
----> 9 result = pool.map(func, chunks)
     10 end = time.time()
     11 print(end-start)

/usr/local/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    288         in a list that is returned.
    289         '''
--> 290         return self._map_async(func, iterable, mapstar, chunksize).get()
    291 
    292     def starmap(self, func, iterable, chunksize=None):

/usr/local/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    681             return self._value
    682         else:
--> 683             raise self._value
    684 
    685     def _set(self, i, obj):

BrokenPipeError: [Errno 32] Broken pipe

Кроме того, я получаю следующие предупреждения

Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to write data to connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687)); ("32; 'Broken pipe'")
Transaction failed and will be retried in 1.1551515321361832s (Failed to write to closed connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687)))

Журнал отладки сервера neo4j выглядит следующим образом

2020-04-09 13:07:26.033+0000 INFO [o.n.l.i.StoreLogService] Rotated internal log file
2020-04-09 13:08:16.724+0000 ERROR [o.n.b.t.p.HouseKeeper] Fatal error occurred when handling a client connection: [id: 0xdb5b2521, L:/127.0.0.1:7687 ! R:/127.0.0.1:58086] javax.net.ssl.SSLException: bad record MAC
io.netty.handler.codec.DecoderException: javax.net.ssl.SSLException: bad record MAC
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:472)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.net.ssl.SSLException: bad record MAC
    at sun.security.ssl.Alerts.getSSLException(Alerts.java:208)
    at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1709)
    at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:970)
    at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:896)
    at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:766)
    at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
    at io.netty.handler.ssl.SslHandler$SslEngineType$3.unwrap(SslHandler.java:295)
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1301)
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1203)
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1247)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
    ... 17 more
Caused by: javax.crypto.BadPaddingException: bad record MAC
    at sun.security.ssl.EngineInputRecord.decrypt(EngineInputRecord.java:238)
    at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:963)
    ... 26 more

Несколько моментов, которые я хотел бы упомянуть

  • Пытался использовать один драйвер для всей программы и настраивать сеанс по мере необходимости.
  • Я пытался использовать драйвер для каждого процесса, и я все еще сталкиваюсь с проблемой.
  • Это звучит странно, но я попытался настроить драйвер, когда требуется вызов БД, и закрыл его сразу после получения данных. Здесь я не сталкиваюсь с ошибкой сломанной трубы, но в соединении отказывается после достижения предела.

Я хочу знать, каков идеальный способ установки драйвера. Также, пожалуйста, помогите мне в решении этих проблем.

Спасибо


person Pradeep Kumar Nalluri    schedule 09.04.2020    source источник
comment
На данный момент это решено с помощью блока try-catch и повторной инициализации драйвера. Решает вопросы.   -  person Pradeep Kumar Nalluri    schedule 10.04.2020
comment
Судя по журналам, основной причиной проблемы является неверная запись MAC-адреса в библиотеке SSL.   -  person wovano    schedule 21.08.2020
comment
Отвечает ли это на ваш вопрос? Как устранить ошибку MAC-адреса неверной записи SSL   -  person wovano    schedule 21.08.2020