It should work. Do you have log from one of the workers by any chance? I will send you another kafka endpoint you could try running against.
Raghu. On Wed, Mar 23, 2016 at 3:12 PM, William McCarthy <[email protected] > wrote: > Raghu, > > Sorry, I may have spoken too soon when I said that it “worked”. > > The code did not throw any exceptions, and seemed to start up happily. I > then tried injecting a number of “tweets” (with hashtags) into my test_in > kafka topic. I’ve been waiting for more than 10 mins, now, and I see > nothing on my test_out kafka topic. I expected to see the top hashtags to > show up there within a 10 minute window. > > Am I misunderstanding something? > > To help debug the issue: Previously, I noticed that if I injected a random > string into the test_in topic, the Beam job fell over with a Jackson > JsonMappingException, so it would appear that the job is getting my > messages. But when I restarted the Beam job and inject 3 correctly formed > tweet message, something is silently being dropped between there and the > output kafka topic. > > Bill > > On Mar 23, 2016, at 5:53 PM, Raghu Angadi <[email protected]> wrote: > > great news! Thanks for trying multiple fixes. and thanks for Max and Aljoscha > multiple fixes. > > Raghu > > On Wed, Mar 23, 2016 at 2:51 PM, William McCarthy < > [email protected]> wrote: > >> Thanks Max, >> >> This command now works, thanks: >> >> $ flink run -c >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> target/beam-1.0-SNAPSHOT.jar >> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >> --streaming=true >> >> Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e. >> without the package scope). That behaviour is unexpected to you, given the >> test that you put in a few days ago, right? i.e. >> >> $ flink run -c >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >> 'FlinkPipelineRunner', supported pipeline runners >> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >> DirectPipelineRunner] >> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) >> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) >> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) >> at >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> ... 6 more >> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:264) >> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) >> ... 14 more >> >> >> > On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected]> wrote: >> > >> > Hi William, >> > >> > It has been merged. Feel free to try again. >> > >> > Cheers, >> > Max >> > >> > On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]> >> wrote: >> >> The pull request is here: >> https://github.com/apache/incubator-beam/pull/69 >> >> >> >> I'll merge it after the tests have passed and it has been reviewed. >> >> >> >> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]> >> wrote: >> >>> Hi William, >> >>> >> >>> I started working on a fix and better testing of the >> >>> UnboundedSourceWrapper. I'll get back to you shortly with a pull >> >>> request that we should be able to merge soon. >> >>> >> >>> - Max >> >>> >> >>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek < >> [email protected]> wrote: >> >>>> Hi, >> >>>> as far as I can see from a quick glance the problem is that >> UnboundedSourceWrapper stores an instance of Reader that it gets from the >> Source. The Reader is not Serializable while the UnboundedSource is. I >> think the Reader should be initialized when actually running the source, in >> the run() method. >> >>>> >> >>>> Cheers, >> >>>> Aljoscha >> >>>>> On 23 Mar 2016, at 13:07, William McCarthy < >> [email protected]> wrote: >> >>>>> >> >>>>> The first stack trace was from the latest runner, yes. I’ve pulled >> the very latest, just now, and still get the same thing. >> >>>>> >> >>>>> When I write ‘pulled the very latest’, here’s what I mean (all of >> the following commands, except the latest, finished with success): >> >>>>> >> >>>>> $ cd incubator-beam >> >>>>> $ git pull >> >>>>> …some output, then success >> >>>>> $ git branch >> >>>>> * master >> >>>>> $ mvn -DskipTests clean install >> >>>>> …some output, then success >> >>>>> $ cd <my project> >> >>>>> $ mvn -DskipTests clean install >> >>>>> …some output, then success >> >>>>> $ <command as output below> >> >>>>> >> >>>>> Bill >> >>>>> >> >>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> >> wrote: >> >>>>>> >> >>>>>> Hi William, >> >>>>>> >> >>>>>> Is the first stack trace from the latest master? Using only the >> simple >> >>>>>> class name of the Runner should actually work now. I've also added >> a >> >>>>>> test to explicitly test that. >> >>>>>> >> >>>>>> The latter error, as Aljoscha pointed out, this due to batch >> >>>>>> execution. Perhaps we could make it explicit during startup which >> mode >> >>>>>> we're executing in. File a JIRA issue for that: >> >>>>>> https://issues.apache.org/jira/browse/BEAM-139 >> >>>>>> >> >>>>>> - Max >> >>>>>> >> >>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek < >> [email protected]> wrote: >> >>>>>>> Hi, >> >>>>>>> for Flink the runner can internally either translate to a batch >> job or a streaming job. Unbounded sources are not supported when running in >> batch mode so you have to somehow specify that you want to have streaming >> mode. StreamingOptions has method setStreaming, maybe you can specify >> “—streaming true” on the command line to set that flag. >> >>>>>>> >> >>>>>>> Cheers, >> >>>>>>> Aljoscha >> >>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy < >> [email protected]> wrote: >> >>>>>>>> >> >>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both >> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, though >> the latter gets further. I hope this helps, here is the output of both: >> >>>>>>>> >> >>>>>>>> $ flink run -c >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >> >>>>>>>> >> >>>>>>>> ------------------------------------------------------------ >> >>>>>>>> The program finished with the following exception: >> >>>>>>>> >> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >> main method caused an error. >> >>>>>>>> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> >>>>>>>> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> >>>>>>>> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >> specified 'FlinkPipelineRunner', supported pipeline runners >> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >> DirectPipelineRunner] >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) >> >>>>>>>> at >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >> >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>>>>>>> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>>>>>>> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >> >>>>>>>> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> >>>>>>>> ... 6 more >> >>>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner >> >>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> >>>>>>>> at java.lang.Class.forName0(Native Method) >> >>>>>>>> at java.lang.Class.forName(Class.java:264) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) >> >>>>>>>> ... 14 more >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> $ flink run -c >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> target/beam-1.0-SNAPSHOT.jar >> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >> >>>>>>>> >> >>>>>>>> ------------------------------------------------------------ >> >>>>>>>> The program finished with the following exception: >> >>>>>>>> >> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >> main method caused an error. >> >>>>>>>> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> >>>>>>>> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> >>>>>>>> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> >>>>>>>> at >> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> >>>>>>>> Caused by: java.lang.UnsupportedOperationException: The >> transform Read(UnboundedKafkaSource) is currently not supported. >> >>>>>>>> at >> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >> >>>>>>>> at >> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34) >> >>>>>>>> at >> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130) >> >>>>>>>> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109) >> >>>>>>>> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50) >> >>>>>>>> at >> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >> >>>>>>>> at >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >> >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>>>>>>> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>>>>>>> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >> >>>>>>>> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> >>>>>>>> ... 6 more >> >>>>>>>> >> >>>>>>>> >> >>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> >> wrote: >> >>>>>>>>> >> >>>>>>>>> Thanks Max. >> >>>>>>>>> >> >>>>>>>>> Bill McCarthy, >> >>>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please >> try KafkaIO source from my branch with Flink runner if you get a chance. >> >>>>>>>>> >> >>>>>>>>> thanks, >> >>>>>>>>> Raghu. >> >>>>>>>>> >> >>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré < >> [email protected]> wrote: >> >>>>>>>>> Thanks for the update Max ! >> >>>>>>>>> >> >>>>>>>>> Regards >> >>>>>>>>> JB >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote: >> >>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner >> >>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up >> the >> >>>>>>>>> PipelineRunner class in case it has not been registered [2]. >> >>>>>>>>> >> >>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40 >> >>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61 >> >>>>>>>>> >> >>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels < >> [email protected]> wrote: >> >>>>>>>>> Great to see such a lively discussion here. >> >>>>>>>>> >> >>>>>>>>> I think we'll support sinks through the Write interface (like in >> >>>>>>>>> batched execution) and also have a dedicated wrapper for the >> Flink >> >>>>>>>>> sinks. This is a very pressing but easy to solve issue of the >> Flink >> >>>>>>>>> runner. Expect it to be in next week. >> >>>>>>>>> >> >>>>>>>>> Also, the proper registration of the runner is about to to be >> merged. >> >>>>>>>>> We just need an ok from the contributor to merge the changes. >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> Max >> >>>>>>>>> >> >>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin < >> [email protected]> wrote: >> >>>>>>>>> Thanks Bill! >> >>>>>>>>> >> >>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm >> glad it's not >> >>>>>>>>> blocking you! >> >>>>>>>>> >> >>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy >> >>>>>>>>> <[email protected]> wrote: >> >>>>>>>>> >> >>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a >> blocker for >> >>>>>>>>> me, given that I have my example running, but for your >> information the >> >>>>>>>>> exception is below. >> >>>>>>>>> >> >>>>>>>>> I’ll watch the commit log on the beam incubator and look >> forward to >> >>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger >> to master. >> >>>>>>>>> >> >>>>>>>>> Thanks again for everyone’s help, >> >>>>>>>>> >> >>>>>>>>> Bill >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Command followed by exception: >> >>>>>>>>> >> >>>>>>>>> $ flink run -c >> >>>>>>>>> >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar >> >>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in >> --outputTopic=test_out >> >>>>>>>>> >> >>>>>>>>> ------------------------------------------------------------ >> >>>>>>>>> The program finished with the following exception: >> >>>>>>>>> >> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >> main >> >>>>>>>>> method caused an error. >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> >>>>>>>>> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> >>>>>>>>> at >> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >> specified >> >>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported >> pipeline >> >>>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >> >>>>>>>>> DirectPipelineRunner] >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>>>>>>>> at >> >>>>>>>>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>>>>>>>> at >> >>>>>>>>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> >>>>>>>>> ... 6 more >> >>>>>>>>> >> >>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> >> wrote: >> >>>>>>>>> >> >>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same >> way the >> >>>>>>>>> Dataflow & Direct Pipeline runners are registered; using >> >>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work >> >>>>>>>>> >> >>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy >> >>>>>>>>> <[email protected]> wrote: >> >>>>>>>>> >> >>>>>>>>> Thanks Dan, >> >>>>>>>>> >> >>>>>>>>> I tried that, but getting the below. Note that the jar contains >> the >> >>>>>>>>> FlinkPipelineRunner. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class >> >>>>>>>>> >> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class >> >>>>>>>>> >> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >> >>>>>>>>> >> >>>>>>>>> % flink run -c >> >>>>>>>>> >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in >> --outputTopic=test_out >> >>>>>>>>> >> >>>>>>>>> ------------------------------------------------------------ >> >>>>>>>>> The program finished with the following exception: >> >>>>>>>>> >> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >> main >> >>>>>>>>> method caused an error. >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> >>>>>>>>> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> >>>>>>>>> at >> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >> specified >> >>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners >> >>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >> >>>>>>>>> DirectPipelineRunner] >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>>>>>>>> at >> >>>>>>>>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>>>>>>>> at >> >>>>>>>>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> >>>>>>>>> ... 6 more >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> >> wrote: >> >>>>>>>>> >> >>>>>>>>> Thanks for catching that, Aljoscha! >> >>>>>>>>> >> >>>>>>>>> Note that the Flink runner should be available via a >> command-line option >> >>>>>>>>> as well: --runner=FlinkPipelineRunner. >> >>>>>>>>> >> >>>>>>>>> The list of valid values for that flag is computed by walking >> the >> >>>>>>>>> classpath at runtime, so as long as the Flink jar is present >> it'll work. >> >>>>>>>>> >> >>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek < >> [email protected]> >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi, >> >>>>>>>>> looks like the example is being executed with the >> DirectPipelineRunner >> >>>>>>>>> which does not seem to be able to cope with UnboundedSource. >> You need to set >> >>>>>>>>> the runner to the FlinkRunner in the example code as described >> here: >> >>>>>>>>> >> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >> >>>>>>>>> >> >>>>>>>>> The Flink runner should be able to deal with UnboundedSource >> but has the >> >>>>>>>>> limitation that sources are always parallelism=1 (this is being >> worked on, >> >>>>>>>>> however). >> >>>>>>>>> >> >>>>>>>>> Cheers, >> >>>>>>>>> Aljoscha >> >>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> >> wrote: >> >>>>>>>>> >> >>>>>>>>> Looks like the Flink runner may not yet support arbitrary code >> written >> >>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink >> runner >> >>>>>>>>> expects the sources to get translated away. >> >>>>>>>>> >> >>>>>>>>> Max? >> >>>>>>>>> >> >>>>>>>>> Dan >> >>>>>>>>> >> >>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >> >>>>>>>>> <[email protected]> wrote: >> >>>>>>>>> Thanks Raghu, >> >>>>>>>>> >> >>>>>>>>> When I try to run it on flink using the incubator-beam code, >> i.e. >> >>>>>>>>> >> >>>>>>>>> flink run -c >> >>>>>>>>> >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >> >>>>>>>>> --topics=test_in --outputTopic=test_out >> >>>>>>>>> >> >>>>>>>>> I get this: >> >>>>>>>>> >> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >> main >> >>>>>>>>> method caused an error. >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> >>>>>>>>> at >> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> >>>>>>>>> at >> >>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> >>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator >> registered >> >>>>>>>>> for Read(UnboundedKafkaSource) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >> >>>>>>>>> at >> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >> >>>>>>>>> at >> >>>>>>>>> >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> >>>>>>>>> at >> >>>>>>>>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>>>>>>>> at >> >>>>>>>>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >> >>>>>>>>> at >> >>>>>>>>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> >>>>>>>>> ... 6 more >> >>>>>>>>> >> >>>>>>>>> Any ideas? >> >>>>>>>>> >> >>>>>>>>> Bill >> >>>>>>>>> >> >>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> >> wrote: >> >>>>>>>>> >> >>>>>>>>> Thanks for trying it. >> >>>>>>>>> >> >>>>>>>>> I fixed the CheckStyle error (not sure why my build is not >> failing). >> >>>>>>>>> Let me know if you see any issues running with Beam. I haven't >> tried it. I >> >>>>>>>>> should. In fact Daniel Halperin says my patch should be against >> Beam.. >> >>>>>>>>> >> >>>>>>>>> Raghu. >> >>>>>>>>> >> >>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >> >>>>>>>>> <[email protected]> wrote: >> >>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and >> Raghu for >> >>>>>>>>> pointing me to working code. >> >>>>>>>>> >> >>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of >> your >> >>>>>>>>> responses has been very welcome. >> >>>>>>>>> >> >>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve >> >>>>>>>>> cloned your repo, switched to the kafka branch and built both >> contrib/kafka >> >>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed >> with a >> >>>>>>>>> CheckStyle error >> >>>>>>>>> >> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >> >>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve >> fixed that >> >>>>>>>>> in my local clone and now it’s building fine. I hope to be able >> to run your >> >>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which >> will be what >> >>>>>>>>> I attempt to do now. >> >>>>>>>>> >> >>>>>>>>> Thanks again to all, for your swift help. >> >>>>>>>>> >> >>>>>>>>> Bill >> >>>>>>>>> >> >>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi Bill, >> >>>>>>>>> >> >>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will >> be >> >>>>>>>>> merged soon. The example there keeps track of top hashtags in >> 10 minute >> >>>>>>>>> sliding window and writes the results to another Kafka topic. >> Please try it >> >>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have >> not run it >> >>>>>>>>> using Flink runner. >> >>>>>>>>> >> >>>>>>>>> Raghu. >> >>>>>>>>> >> >>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >> >>>>>>>>> <[email protected]> wrote: >> >>>>>>>>> Hello Bill, >> >>>>>>>>> >> >>>>>>>>> This is a known limitation of the Flink Runner. >> >>>>>>>>> There is a JIRA issue for that >> >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127 >> >>>>>>>>> >> >>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves, >> >>>>>>>>> a more Beam-y solution will come as well. >> >>>>>>>>> >> >>>>>>>>> Kostas >> >>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy >> >>>>>>>>> <[email protected]> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi, >> >>>>>>>>> >> >>>>>>>>> I’m trying to write a proof-of-concept which takes messages from >> >>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the >> results onto a >> >>>>>>>>> different Kafka topic. >> >>>>>>>>> >> >>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point, >> >>>>>>>>> and that’s doing the first part of what I want to do, but it >> outputs to text >> >>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks >> promising, but I can’t >> >>>>>>>>> figure out how to plug it into the pipeline. I was thinking >> that it would be >> >>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that >> doesn’t seem to >> >>>>>>>>> exist. >> >>>>>>>>> >> >>>>>>>>> Any advice or thoughts on what I’m trying to do? >> >>>>>>>>> >> >>>>>>>>> I’m running the latest incubator-beam (as of last night from >> >>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on >> Google >> >>>>>>>>> Compute Engine (Debian Jessie). >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> >> >>>>>>>>> Bill McCarthy >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> -- >> >>>>>>>>> Jean-Baptiste Onofré >> >>>>>>>>> [email protected] >> >>>>>>>>> http://blog.nanthrax.net >> >>>>>>>>> Talend - http://www.talend.com >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >> > >
