Apache Flink: коннектор Kafka в потоковом API Python, не удается загрузить пользовательский класс

Я пробую новый потоковый API Python от Flink и пытаюсь запустить свой скрипт с ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py. Сценарий python довольно прост, я просто пытаюсь использовать существующую тему и отправить все в stdout (или файл * .out в каталоге журнала, где метод вывода по умолчанию выдает данные).

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema

directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
    for jar in glob.glob(os.path.join(directory,'*.jar')):
                sys.path.append(jar)

from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09

props = Properties()
config = {"bootstrap_servers": "localhost:9092",
          "group_id": "flink_test",
          "topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")

def main(factory):
    consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)

    env = factory.get_execution_environment()
    env.add_java_source(consumer) \
        .output()
    env.execute()

Я взял несколько файлов jar из репозиториев maven, а именно flink-connector-kafka-0.9_2.11-1.6.1.jar, flink-connector-kafka-base_2.11-1.6.1.jar и kafka-clients-0.9.0.1.jar, и скопировал их в каталог lib Flink. Если я не понял документацию неправильно, этого должно хватить, чтобы Flink загрузил коннектор kafka. Действительно, если я удалю любую из этих jar-файлов, импорт завершится неудачно, но этого, похоже, недостаточно, чтобы активировать план. Добавление цикла for для динамического добавления их в sys.path тоже не сработало. Вот что печатается в консоли:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
    at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

Вот что я вижу в журналах:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class:    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
    file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

Есть ли способ исправить это и сделать коннектор доступным для Python? Я подозреваю, что это проблема загрузчика классов с Jython, но я не знаю, как продолжить расследование (также учитывая, что я ничего не знаю о Java). Большое спасибо.


person adaris    schedule 10.10.2018    source источник


Ответы (2)


Здесь вы используете неправильного потребителя Kafka. В вашем коде это FlinkKafkaConsumer09, но используемая вами библиотека flink-connector-kafka-0.11_2.11-1.6.1.jar, то есть для FlinkKafkaConsumer011. Попробуйте заменить FlinkKafkaConsumer09 этим FlinkKafkaConsumer011 или используйте файл библиотеки flink-connector-kafka-0.9_2.11-1.6.1.jar вместо текущего.

person BrightFlow    schedule 10.10.2018
comment
Извините, у меня действительно был flink-connector-kafka-0.9_2.11-1.6.1.jar в моем lib каталоге, а не 0.11, это была всего лишь опечатка в моем сообщении (теперь отредактированном). Первоначальная проблема остается. - person adaris; 11.10.2018

Я гость, файл jar может иметь встроенный импорт или зависимости, поэтому трех файлов jar недостаточно. Что касается того, как узнать зависимые отношения java jar, это то, что делает java maven. Вы можете обратиться за помощью на официальном сайте «Настройка сборки проекта». В моем случае я следую официальной настройке java-проекта, использую «from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer» и добавляю зависимость «org.apache.flink
flink-clients_2.11
1.8. 0 "в pom.xml, то теперь я могу выводить записи kafka на стандартный вывод с помощью Python API.

person zenglong chen    schedule 28.06.2019