Hi,

Did you also specify the “-Pflink-runner” flag while building? From your mail 
it seems as you’re just building with “mvn clean install -DskipTests”. You need 
to also specify the flag when building so that the Flink dependencies are 
bundled into the created fat jar.

Best,
Aljoscha
> On 13. Jun 2017, at 10:25, 基勇 <[email protected]> wrote:
> 
> Beam 2.0.0 and flink 1.2.1
> 
> 
> ------------------ 原始邮件 ------------------
> 发件人: "Aljoscha Krettek";<[email protected]>;
> 发送时间: 2017年6月12日(星期一) 下午5:42
> 收件人: "user"<[email protected]>;
> 主题: Re: Input/Output data to kafka exception
> 
> Hi,
> 
> How are you bundling your program for execution? Are you, for example, 
> building a fat-jar using Maven? How are you executing the program? Using 
> bin/flink or by executing the program using mvn exec? Also, which Beam/Flink 
> versions are you using?
> 
> Best,
> Aljoscha
>> On 12. Jun 2017, at 11:28, 基勇 <[email protected] <mailto:[email protected]>> 
>> wrote:
>> 
>> Hi,
>>     I used beam API to write code to read Kafka data and run with Flink, but 
>> run to throw the following exception:
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
>> Cannot load user class: 
>> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>> ClassLoader info: URL ClassLoader:
>>     file: 
>> '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff'
>>  (valid JAR)
>> Class not resolvable through given classloader.
>>      at 
>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
>>      at java.lang.Thread.run(Thread.java:745)
>> 
>> code:
>> KafkaOptions options = 
>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
>>              options.setJobName("KafkaExample - WindowSize: " + 
>> options.getWindowSize() + " seconds");
>>              options.setStreaming(true);
>>              options.setCheckpointingInterval(1000L);
>>              options.setNumberOfExecutionRetries(5);
>>              options.setExecutionRetryDelay(3000L);
>>              options.setRunner(FlinkRunner.class);
>> 
>>              Pipeline pipeline = Pipeline.create(options);
>> 
>>              pipeline.apply(KafkaIO.<String, String>read()
>>                             .withBootstrapServers("localhost:9092")
>>                             .withTopic(KAFKA_TOPIC)  // use 
>> withTopics(List<String>) to read from multiple topics.
>>                             .withKeyDeserializer(StringDeserializer.class)
>>                             .withValueDeserializer(StringDeserializer.class)
>>                             .withoutMetadata() // PCollection<KV<String, 
>> String>>
>>              ).apply(Values.<String> create());
>> //                           .apply(KafkaIO.<Void, String>write()
>> //                                   .withBootstrapServers("localhost:9092")
>> //                                   .withTopic(KAFKA_OUTPUT_TOPIC)
>> //                                   
>> .withValueSerializer(StringSerializer.class)
>> //                                   .values());
>>                             
>>              pipeline.run();//.waitUntilFinish();
>> 
>> How to fix it ?
>> 
>> Thank you!
> 

Reply via email to