Hi,
your first example doesn't work because the SimpleStringSchema does not
work for sinks. You can use this modified serialization schema:
https://gist.github.com/aljoscha/e131fa8581f093915582. This works for both
source and sink (I think the current SimpleStringSchema is not correct and
should be changed in the next release.)

Cheers,
Aljoscha

On Thu, 16 Jul 2015 at 08:37 Anwar Rizal <anriza...@gmail.com> wrote:

> The compilation error is because you don't define dependencies to flink
> streaming scala.
> In SBT , you define something like:
>
> libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0"
>
>
>
> On Thu, Jul 16, 2015 at 6:36 AM, Wendong <wendong....@gmail.com> wrote:
>
>> I tried, but got error:
>>
>> [error] TestKafka.scala:11: object scala is not a member of package
>> org.apache.flink.streaming.api
>> [error] import org.apache.flink.streaming.api.scala._
>>
>> So I switched back to my original import statements.
>>
>> Now I changed SimpleStringSchema to JavaDefaultStringSchema in addSink(new
>> KafkaSink(...)), then compilation error was gone.
>>
>> The problem is that there is runtime error:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>>         at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>>         at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>         at org.apache.flink.client.program.Client.run(Client.java:315)
>>         at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
>>         at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
>> Caused by: java.lang.RuntimeException:* Data stream sinks cannot be
>> copied*
>>         at
>>
>> org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:43)
>>         at
>>
>> org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:30)
>>         at
>>
>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1341)
>>         at
>>
>> org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:1029)
>> ...........
>>
>> Googled the error message but didn't find useful information.
>>
>> Anyone can shed some light?
>>
>> Thanks!
>>
>> Wendong
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2071.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Reply via email to