Hi Kostas,

As far as I know you cannot just use java classes from within python
API. I think Python API does not provide wrapper for kafka connector. I
am adding Chesnay to cc to correct me if I am wrong.

Best,

Dawid


On 11/10/18 12:18, Kostas Evangelou wrote:
> Hey all, 
>
> Thank you so much for your efforts. I've already posted this question
> on stack overflow, but thought I should ask here as well.
>
> I am trying out Flink's new Python streaming API and attempting to run
> my script with |./flink-1.6.1/bin/pyflink-stream.sh
> examples/read_from_kafka.py|. The python script is fairly
> straightforward, I am just trying to consume from an existing topic
> and send everything to stdout (or the *.out file in the log directory
> where the output method emits data by default).
>
> import glob
>
> import os
>
> import sys
>
> from java.util import Properties
>
> from org.apache.flink.streaming.api.functions.source import SourceFunction
>
> from org.apache.flink.streaming.api.collector.selector import
> OutputSelector
>
> from org.apache.flink.api.common.serialization import SimpleStringSchema
>
>
> directories=['/home/user/flink/flink-1.6.1/lib']
>
> for directory in directories:
>
>     for jar in glob.glob(os.path.join(directory,'*.jar')):
>
>                 sys.path.append(jar)
>
>
> from org.apache.flink.streaming.connectors.kafka import
> FlinkKafkaConsumer09
>
>
> props = Properties()
>
> config = {"bootstrap_servers": "localhost:9092",
>
>           "group_id": "flink_test",
>
>           "topics": ["TopicCategory-TopicName"]}
>
> props.setProperty("bootstrap.servers", config['bootstrap_servers'])
>
> props.setProperty("group_id", config['group_id'])
>
> props.setProperty("zookeeper.connect", "localhost:2181")
>
>
> def main(factory):
>
>     consumer = FlinkKafkaConsumer09([config["topics"]],
> SimpleStringSchema(), props)
>
>
>     env = factory.get_execution_environment()
>
>     env.add_java_source(consumer) \
>
>         .output()
>
>     env.execute()
>
>
> I grabbed a handful of jar files from the maven repos,
> namely |flink-connector-kafka-0.9_2.11-1.6.1.jar|, 
> |flink-connector-kafka-base_2.11-1.6.1.jar| and |kafka-clients-0.9.0.1.jar|and
> copied them in Flink's |lib| directory. Unless I misunderstood the
> documentation, this should suffice for Flink to load the kafka
> connector. Indeed, if I remove any of these jars the import fails, but
> this doesn't seem to be enough to actually invoke the plan. Adding a
> for loop to dynamically add these to |sys.path| didn't work either.
> Here's what gets printed in the console:
>
> Starting execution of program
>
> Failed to run plan: null
>
> Traceback (most recent call last):
>
>   File "<string>", line 1, in <module>
>
>   File
> "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py",
> line 32, in main
>
>     at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
>
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>
>     at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>
>     at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>
>     at
> org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>
> org.apache.flink.client.program.ProgramInvocationException:
> org.apache.flink.client.program.ProgramInvocationException: Job
> failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
>
>
> The program didn't contain a Flink job. Perhaps you forgot to call
> execute() on the execution environment.
>
>
> This is what I see in the logs:
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> load user class:   
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>
> ClassLoader info: URL ClassLoader:
>
>     file:
> '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887'
> (valid JAR)
>
> Class not resolvable through given classloader.
>
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
>
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
>
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
> Is there a way to fix this and make the connector available to Python?
>
> Many thanks,
> Kostas
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to