@Aljoscha Krettek
The execution command is :
mvn exec:java -Dexec.mainClass=com.exmind.beam.App \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --flinkMaster=localhost:6123 \
      --filesToStage=target/com.exmind.beam-0.0.1-SNAPSHOT.jar"





------------------ ???????? ------------------
??????: "????????????";<[email protected]>;
????????: 2017??6??12??(??????) ????11:24
??????: "user"<[email protected]>; 

????: ?????? Input/Output data to kafka exception



Yes! 
 I build the jar file by command "mvn clean install -DskipTests", and then 
execute it through the "mvn exec:java " command!




------------------ ???????? ------------------
??????: "Aljoscha Krettek";<[email protected]>;
????????: 2017??6??12??(??????) ????5:42
??????: "user"<[email protected]>; 

????: Re: Input/Output data to kafka exception



Hi,

How are you bundling your program for execution? Are you, for example, building 
a fat-jar using Maven? How are you executing the program? Using bin/flink or by 
executing the program using mvn exec? Also, which Beam/Flink versions are you 
using?


Best,
Aljoscha
On 12. Jun 2017, at 11:28, ???? <[email protected]> wrote:

Hi,    I used beam API to write code to read Kafka data and run with Flink, but 
run to throw the following exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
ClassLoader info: URL ClassLoader:
    file: 
'/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff'
 (valid JAR)
Class not resolvable through given classloader.
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
        at java.lang.Thread.run(Thread.java:745)



code:
KafkaOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
                options.setJobName("KafkaExample - WindowSize: " + 
options.getWindowSize() + " seconds");
                options.setStreaming(true);
                options.setCheckpointingInterval(1000L);
                options.setNumberOfExecutionRetries(5);
                options.setExecutionRetryDelay(3000L);
                options.setRunner(FlinkRunner.class);


                Pipeline pipeline = Pipeline.create(options);


                pipeline.apply(KafkaIO.<String, String>read()
                               .withBootstrapServers("localhost:9092")
                               .withTopic(KAFKA_TOPIC)  // use 
withTopics(List<String>) to read from multiple topics.
                               .withKeyDeserializer(StringDeserializer.class)
                               .withValueDeserializer(StringDeserializer.class)
                               .withoutMetadata() // PCollection<KV<String, 
String>>
                ).apply(Values.<String> create());
//                              .apply(KafkaIO.<Void, String>write()
//                                      .withBootstrapServers("localhost:9092")
//                                      .withTopic(KAFKA_OUTPUT_TOPIC)
//                                      
.withValueSerializer(StringSerializer.class)
//                                      .values());

                               

                pipeline.run();//.waitUntilFinish();


How to fix it ?


Thank you??

Reply via email to