Исходной задаче Debezium не удается повторно подключиться к базе данных postgresql при повторном создании контейнера БД

У нас есть кластер kubernetes, в котором Debezium работает как исходная задача из Postgresql и пишет в kafka. Debezium, postgres и kafka работают в отдельных модулях. Когда модуль postgres удаляется и kubernetes повторно создает модуль, модуль debezium не может повторно подключиться. Журналы из стручка дебезиума:

    2018-07-17 08:31:38,311 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
    2018-07-17 08:31:38,311 INFO   ||  [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms.   [org.apache.kafka.clients.producer.KafkaProducer]

Debezium продолжает попытки периодически удалять невыполненные сообщения, но дает следующее исключение:

    2018-07-17 08:32:38,167 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Exception thrown while calling task.commit()   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
    at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:138)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:437)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:378)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:45)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:82)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
    at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:942)
    at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:176)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:99)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:246)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:239)
    at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:146)
    ... 13 more
    Caused by: java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at org.postgresql.core.PGStream.flush(PGStream.java:553)
    at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:939)
    ... 19 more

Есть ли способ восстановить соединение debezium с postgres, когда оно станет доступным? Или мне не хватает какой-то конфигурации?

  • Дебезиум версия 0.8
  • kubernetes версия 1.10.3
  • postgres версии 9.6

person Finbarr    schedule 17.07.2018    source источник
comment
Не могли бы вы поделиться более подробной информацией о кубернетах, например о файлах yaml? Было бы полезно, если бы вы предоставили более подробную информацию об окружающей среде.   -  person Crou    schedule 17.07.2018


Ответы (1)


Похоже, это обычная проблема, и у нее есть открытые запросы функций как в debezium, так и в kafka.

https://issues.jboss.org/browse/DBZ-248

https://issues.apache.org/jira/browse/KAFKA-5352

Пока они открыты, похоже, что это ожидаемое поведение

В качестве обходного пути я добавил этот зонд живучести в развертывание.

    livenessProbe:
        exec:
          command:
          - sh
          - -ec
          - ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1  -d'/'); reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l); if [ $reply -lt 2 ]; then exit 1; fi;
        initialDelaySeconds: 30
        periodSeconds: 5

Первое предложение получает IP-адрес контейнера:

    ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/');

Второе предложение выполняет запрос и подсчитывает экземпляры RUNNING в ответе json:

    reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l);

Третье предложение возвращает код выхода 1, если "RUNNING" появляется менее двух раз.

    if [ $reply -lt 2 ]; then exit 1; fi

Кажется, он работает над начальными тестами - то есть перезапуск БД postgres вызывает перезапуск контейнера debezium. Я предполагаю, что сценарий чего-то вроде этого (хотя, возможно, «робастизированный») может быть включен в изображение для облегчения исследования.

person Finbarr    schedule 17.07.2018
comment
Эй, да, в настоящее время у нас нет идеи о повторном подключении, реализованной в самих разъемах. Идея состоит в том, что это может быть сделано с помощью уровня оркестровки (Kubernetes в вашем случае), который будет отслеживать статус коннектора через REST API и перезапускать его, если задача переходит в состояние FAILED. Однако это не высечено в камне, и мы можем в конечном итоге предоставить внутреннее решение, но мы еще не дошли до него. - person Gunnar; 17.07.2018
comment
Хорошо, спасибо @Gunnar. Приятно знать, что я чего-то не упускаю и этого ожидаемо. Я попытаюсь добавить зонды живучести и готовности кубернетов в модуль Debezium для обработки этого сценария. - person Finbarr; 18.07.2018
comment
Да, это была бы идея. Вы можете получить статус коннектора из REST API Kafka Connect по следующему URL-адресу: connect- хост: 8083 / connector / your-connector / status. При этом, если вам удастся реализовать указанное исследование, возможно, вам будет интересно написать гостевой пост по этой теме в блоге на debezium.io? Несомненно, это было бы полезно и другим людям. - person Gunnar; 18.07.2018
comment
@Gunnar - похоже, что api возвращает 200 OK как когда задача ВЫПОЛНЯЕТСЯ, так и когда она НЕ УДАЕТСЯ. kubelet полагается на код состояния, чтобы определить, готов ли модуль к работе или нет. Возможно, можно выполнить команду и проанализировать ответ json - я попробую - person Finbarr; 18.07.2018
comment
Вы можете использовать jq для получения точного состояния задачи: curl -s $ ipaddress: 8083 / connector / коннектор для пеших прогулок / статус | jq '.tasks [0] .state' - person Gunnar; 19.07.2018
comment
Еще одна вещь, о которой следует помнить при подходе к проверке живучести, заключается в том, что AFAIU перезапускает весь узел Kafka Connect. Это может быть не идеально при развертывании нескольких соединителей в одном экземпляре (на всякий случай, если вы это делаете). - person Gunnar; 20.07.2018
comment
Вы имеете в виду, когда debezium подключен более чем к одному источнику? Например. несколько баз данных? Если да, то да, этот подход не подходит, поскольку kubernetes перезапускает контейнер, в котором размещено приложение debezium. В нашем случае пока только один источник. Возможно, это еще одна причина рассмотреть возможность внутренней обработки перезапуска задач? - person Finbarr; 20.07.2018