Hi,

How are you bundling your program for execution? Are you, for example, building 
a fat-jar using Maven? How are you executing the program? Using bin/flink or by 
executing the program using mvn exec? Also, which Beam/Flink versions are you 
using?

Best,
Aljoscha
> On 12. Jun 2017, at 11:28, 基勇 <[email protected]> wrote:
> 
> Hi,
>     I used beam API to write code to read Kafka data and run with Flink, but 
> run to throw the following exception:
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot load user class: 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
> ClassLoader info: URL ClassLoader:
>     file: 
> '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff'
>  (valid JAR)
> Class not resolvable through given classloader.
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
>       at java.lang.Thread.run(Thread.java:745)
> 
> code:
> KafkaOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
>               options.setJobName("KafkaExample - WindowSize: " + 
> options.getWindowSize() + " seconds");
>               options.setStreaming(true);
>               options.setCheckpointingInterval(1000L);
>               options.setNumberOfExecutionRetries(5);
>               options.setExecutionRetryDelay(3000L);
>               options.setRunner(FlinkRunner.class);
> 
>               Pipeline pipeline = Pipeline.create(options);
> 
>               pipeline.apply(KafkaIO.<String, String>read()
>                              .withBootstrapServers("localhost:9092")
>                              .withTopic(KAFKA_TOPIC)  // use 
> withTopics(List<String>) to read from multiple topics.
>                              .withKeyDeserializer(StringDeserializer.class)
>                              .withValueDeserializer(StringDeserializer.class)
>                              .withoutMetadata() // PCollection<KV<String, 
> String>>
>               ).apply(Values.<String> create());
> //                            .apply(KafkaIO.<Void, String>write()
> //                                    .withBootstrapServers("localhost:9092")
> //                                    .withTopic(KAFKA_OUTPUT_TOPIC)
> //                                    
> .withValueSerializer(StringSerializer.class)
> //                                    .values());
>                              
>               pipeline.run();//.waitUntilFinish();
> 
> How to fix it ?
> 
> Thank you!

Reply via email to