I am tyring to use the flink kafka connector, for this I have specified the
kafka connector dependency and created a fat jar since default flink
installation does not contain kafka connector jars. I have made sure that
flink-streaming-demo-0.1.jar has the
kafka.javaapi.consumer.SimpleConsumer.class but still I see the class not
found exception.

The code for kafka connector in flink.

val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop:Properties = new Properties()
prop.setProperty("zookeeper.connect","somezookeer:2181")
prop.setProperty("group.id","some-group")
prop.setProperty("bootstrap.servers","somebroker:9092")

val stream = env
  .addSource(new FlinkKafkaConsumer082[String]("location", new
SimpleStringSchema, prop))

jar tvf flink-streaming-demo-0.1.jar | grep
kafka.javaapi.consumer.SimpleConsumer

  5111 Fri Mar 11 14:18:36 UTC 2016 *kafka/javaapi/consumer/SimpleConsumer*
.class

flink.version = 0.10.2
kafka.verison = 0.8.2
flink.kafka.connection.verion=0.9.1

The command that I use to run the flink program in yarn cluster is below,

HADOOP_CONF_DIR=/etc/hadoop/conf /usr/share/flink/bin/flink run -c
com.dataartisans.flink_demo.examples.DriverEventConsumer  -m yarn-cluster
-yn 2 /home/balajirajagopalan/flink-streaming-demo-0.1.jar

java.lang.NoClassDefFoundError: kafka/javaapi/consumer/SimpleConsumer

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:691)

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:281)

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.<init>(FlinkKafkaConsumer082.java:49)

at
com.dataartisans.flink_demo.examples.DriverEventConsumer$.main(DriverEventConsumer.scala:53)

at
com.dataartisans.flink_demo.examples.DriverEventConsumer.main(DriverEventConsumer.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)

at org.apache.flink.client.program.Client.runBlocking(Client.java:252)

at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)

at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)

Caused by: java.lang.ClassNotFoundException:
kafka.javaapi.consumer.SimpleConsumer

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 16 more


Any help appreciated.


balaji

Reply via email to