Re: Flink Kafka example in Scala

2015-07-21 Thread Till Rohrmann
Glad to hear that it finally worked :-) On Tue, Jul 21, 2015 at 2:21 AM, Wendong wrote: > Hi Till, > > Thanks for your suggestion! I did a fat jar and the runtime error of > ClassNotFoundException was finally gone. I wish I had tried fat jar earlier > and it would have saved me 4 days. > > Wendo

Re: Flink Kafka example in Scala

2015-07-20 Thread Wendong
Hi Till, Thanks for your suggestion! I did a fat jar and the runtime error of ClassNotFoundException was finally gone. I wish I had tried fat jar earlier and it would have saved me 4 days. Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble

Re: Flink Kafka example in Scala

2015-07-20 Thread Stephan Ewen
For other issues (hadoop versions), we used a shell script that did a search and replace on the variables. Maybe you can do the same trick here... On Mon, Jul 20, 2015 at 10:37 AM, Anwar Rizal wrote: > Coz I don't like it :-) > > No, seriously, sure, I can do it with maven. It worked indeed wi

Re: Flink Kafka example in Scala

2015-07-20 Thread Anwar Rizal
Coz I don't like it :-) No, seriously, sure, I can do it with maven. It worked indeed with maven. But the rest of our ecosystem uses sbt. That's why. -Anwar On Mon, Jul 20, 2015 at 10:28 AM, Till Rohrmann wrote: > Why not trying maven instead? > > On Mon, Jul 20, 2015 at 10:23 AM, Anwar Riz

Re: Flink Kafka example in Scala

2015-07-20 Thread Till Rohrmann
Why not trying maven instead? On Mon, Jul 20, 2015 at 10:23 AM, Anwar Rizal wrote: > I do the same trick as Wendong to avoid compilation error of sbt > (excluding kafka_$(scala.binary.version) ) > > I still don't manage to make sbt pass scala.binary.version to maven. > > Anwar. > > On Mon, Jul 2

Re: Flink Kafka example in Scala

2015-07-20 Thread Anwar Rizal
I do the same trick as Wendong to avoid compilation error of sbt (excluding kafka_$(scala.binary.version) ) I still don't manage to make sbt pass scala.binary.version to maven. Anwar. On Mon, Jul 20, 2015 at 9:42 AM, Till Rohrmann wrote: > Hi Wendong, > > why do you exclude the kafka dependenc

Re: Flink Kafka example in Scala

2015-07-20 Thread Till Rohrmann
Hi Wendong, why do you exclude the kafka dependency from the `flink-connector-kafka`? Do you want to use your own kafka version? I'd recommend you to build a fat jar instead of trying to put the right dependencies in `/lib`. Here [1] you can see how to build a fat jar with sbt. Cheers, Till [1]

Re: Flink Kafka example in Scala

2015-07-17 Thread Wendong
Hi Till, Thanks for the information. I'm using sbt and I have the following line in build.sbt: libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}") Also, I copied flink-connector-kafka-0.9.0.jar under /lib/, bu

Re: Flink Kafka example in Scala

2015-07-17 Thread Wendong
Hi Aljoscha, Yes, the flink-connector-kafka jar file is under Flink lib directory: flink-0.9.0/lib/flink-connector-kafka-0.9.0.jar and it shows KafkaSink class exists: $ jar tf lib/flink-connector-kafka-0.9.0.jar | grep KafkaSink org/apache/flink/streaming/connectors/kafka/api/KafkaSink.c

Re: Flink Kafka example in Scala

2015-07-17 Thread Till Rohrmann
These two links [1, 2] might help to get your job running. The first link describes how to set up a job using Flink's machine learning library, but it works also for the flink-connector-kafka library. Cheers, Till [1] http://stackoverflow.com/a/31455068/4815083 [2] https://ci.apache.org/projects/

Re: Flink Kafka example in Scala

2015-07-16 Thread Aljoscha Krettek
Hi, this looks like the flink-connector-kafka jar is not available where the job is running? Did you put it in the library folder of flink on all the machines or did you submit it with the job? On Thu, Jul 16, 2015, 21:05 Wendong wrote: > Hi Gyula, > > Cool. I removed .print and the error was go

Re: Flink Kafka example in Scala

2015-07-16 Thread Wendong
Hi Gyula, Cool. I removed .print and the error was gone. However, env.execute failed with errors: . Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) ... Caused by: org.apache.fli

Re: Flink Kafka example in Scala

2015-07-16 Thread Gyula Fóra
Hey, The reason you are getting that error is because you are calling print after adding a sink, which is an invalid operation. Remove either addSink or print :) Cheers, Gyula On Thu, Jul 16, 2015 at 7:37 PM Wendong wrote: > Thanks! I tried your updated MySimpleStringSchema and it works for both

Re: Flink Kafka example in Scala

2015-07-16 Thread Wendong
Thanks! I tried your updated MySimpleStringSchema and it works for both source and sink. However, my problem is the runtime error "Data stream sinks cannot be copied" as listed in previous post. I hope someone ran into the problem before and can give a hint. Wendong -- View this message in co

Re: Flink Kafka example in Scala

2015-07-16 Thread Aljoscha Krettek
Hi, your first example doesn't work because the SimpleStringSchema does not work for sinks. You can use this modified serialization schema: https://gist.github.com/aljoscha/e131fa8581f093915582. This works for both source and sink (I think the current SimpleStringSchema is not correct and should be

Re: Flink Kafka example in Scala

2015-07-15 Thread Anwar Rizal
The compilation error is because you don't define dependencies to flink streaming scala. In SBT , you define something like: libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0" On Thu, Jul 16, 2015 at 6:36 AM, Wendong wrote: > I tried, but got error: > > [error] Test

Re: Flink Kafka example in Scala

2015-07-15 Thread Wendong
I tried, but got error: [error] TestKafka.scala:11: object scala is not a member of package org.apache.flink.streaming.api [error] import org.apache.flink.streaming.api.scala._ So I switched back to my original import statements. Now I changed SimpleStringSchema to JavaDefaultStringSchema in add

Re: Flink Kafka example in Scala

2015-07-15 Thread Anwar Rizal
Have you tried to replace import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ With import org.apache.flink.streaming.api.scala._ imp