Я пробую новый потоковый API Python от Flink и пытаюсь запустить свой скрипт с ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py
. Сценарий python довольно прост, я просто пытаюсь использовать существующую тему и отправить все в stdout (или файл * .out в каталоге журнала, где метод вывода по умолчанию выдает данные).
import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema
directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
for jar in glob.glob(os.path.join(directory,'*.jar')):
sys.path.append(jar)
from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09
props = Properties()
config = {"bootstrap_servers": "localhost:9092",
"group_id": "flink_test",
"topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")
def main(factory):
consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)
env = factory.get_execution_environment()
env.add_java_source(consumer) \
.output()
env.execute()
Я взял несколько файлов jar из репозиториев maven, а именно flink-connector-kafka-0.9_2.11-1.6.1.jar
, flink-connector-kafka-base_2.11-1.6.1.jar
и kafka-clients-0.9.0.1.jar
, и скопировал их в каталог lib
Flink. Если я не понял документацию неправильно, этого должно хватить, чтобы Flink загрузил коннектор kafka. Действительно, если я удалю любую из этих jar-файлов, импорт завершится неудачно, но этого, похоже, недостаточно, чтобы активировать план. Добавление цикла for для динамического добавления их в sys.path
тоже не сработало. Вот что печатается в консоли:
Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.
Вот что я вижу в журналах:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Есть ли способ исправить это и сделать коннектор доступным для Python? Я подозреваю, что это проблема загрузчика классов с Jython, но я не знаю, как продолжить расследование (также учитывая, что я ничего не знаю о Java). Большое спасибо.