Raghu,

Sorry, I may have spoken too soon when I said that it “worked”.

The code did not throw any exceptions, and seemed to start up happily. I then 
tried injecting a number of “tweets” (with hashtags) into my test_in kafka 
topic. I’ve been waiting for more than 10 mins, now, and I see nothing on my 
test_out kafka topic. I expected to see the top hashtags to show up there 
within a 10 minute window.

Am I misunderstanding something?

To help debug the issue: Previously, I noticed that if I injected a random 
string into the test_in topic, the Beam job fell over with a Jackson 
JsonMappingException, so it would appear that the job is getting my messages. 
But when I restarted the Beam job and inject 3 correctly formed tweet message, 
something is silently being dropped between there and the output kafka topic.

Bill

> On Mar 23, 2016, at 5:53 PM, Raghu Angadi <[email protected]> wrote:
> 
> great news! Thanks for trying multiple fixes. and thanks for Max and Aljoscha 
> multiple fixes.
> 
> Raghu
> 
> On Wed, Mar 23, 2016 at 2:51 PM, William McCarthy <[email protected] 
> <mailto:[email protected]>> wrote:
> Thanks Max,
> 
> This command now works, thanks:
> 
> $ flink run -c 
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
> target/beam-1.0-SNAPSHOT.jar 
> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out 
> --streaming=true
> 
> Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e. without 
> the package scope). That behaviour is unexpected to you, given the test that 
> you put in a few days ago, right? i.e.
> 
> $ flink run -c 
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
> 
> ------------------------------------------------------------
>  The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>         at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified 
> 'FlinkPipelineRunner', supported pipeline runners 
> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, DirectPipelineRunner]
>         at 
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>         at 
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>         at 
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>         at 
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         ... 6 more
> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:264)
>         at 
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>         ... 14 more
> 
> 
> > On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected] 
> > <mailto:[email protected]>> wrote:
> >
> > Hi William,
> >
> > It has been merged. Feel free to try again.
> >
> > Cheers,
> > Max
> >
> > On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected] 
> > <mailto:[email protected]>> wrote:
> >> The pull request is here: https://github.com/apache/incubator-beam/pull/69 
> >> <https://github.com/apache/incubator-beam/pull/69>
> >>
> >> I'll merge it after the tests have passed and it has been reviewed.
> >>
> >> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected] 
> >> <mailto:[email protected]>> wrote:
> >>> Hi William,
> >>>
> >>> I started working on a fix and better testing of the
> >>> UnboundedSourceWrapper. I'll get back to you shortly with a pull
> >>> request that we should be able to merge soon.
> >>>
> >>> - Max
> >>>
> >>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected] 
> >>> <mailto:[email protected]>> wrote:
> >>>> Hi,
> >>>> as far as I can see from a quick glance the problem is that 
> >>>> UnboundedSourceWrapper stores an instance of Reader that it gets from 
> >>>> the Source. The Reader is not Serializable while the UnboundedSource is. 
> >>>> I think the Reader should be initialized when actually running the 
> >>>> source, in the run() method.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>> On 23 Mar 2016, at 13:07, William McCarthy <[email protected] 
> >>>>> <mailto:[email protected]>> wrote:
> >>>>>
> >>>>> The first stack trace was from the latest runner, yes. I’ve pulled the 
> >>>>> very latest, just now, and still get the same thing.
> >>>>>
> >>>>> When I write ‘pulled the very latest’, here’s what I mean (all of the 
> >>>>> following commands, except the latest, finished with success):
> >>>>>
> >>>>> $ cd incubator-beam
> >>>>> $ git pull
> >>>>> …some output, then success
> >>>>> $ git branch
> >>>>> * master
> >>>>> $ mvn -DskipTests clean install
> >>>>> …some output, then success
> >>>>> $ cd <my project>
> >>>>> $ mvn -DskipTests clean install
> >>>>> …some output, then success
> >>>>> $ <command as output below>
> >>>>>
> >>>>> Bill
> >>>>>
> >>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected] 
> >>>>>> <mailto:[email protected]>> wrote:
> >>>>>>
> >>>>>> Hi William,
> >>>>>>
> >>>>>> Is the first stack trace from the latest master? Using only the simple
> >>>>>> class name of the Runner should actually work now. I've also added a
> >>>>>> test to explicitly test that.
> >>>>>>
> >>>>>> The latter error, as Aljoscha pointed out, this due to batch
> >>>>>> execution. Perhaps we could make it explicit during startup which mode
> >>>>>> we're executing in. File a JIRA issue for that:
> >>>>>> https://issues.apache.org/jira/browse/BEAM-139 
> >>>>>> <https://issues.apache.org/jira/browse/BEAM-139>
> >>>>>>
> >>>>>> - Max
> >>>>>>
> >>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek 
> >>>>>> <[email protected] <mailto:[email protected]>> wrote:
> >>>>>>> Hi,
> >>>>>>> for Flink the runner can internally either translate to a batch job 
> >>>>>>> or a streaming job. Unbounded sources are not supported when running 
> >>>>>>> in batch mode so you have to somehow specify that you want to have 
> >>>>>>> streaming mode. StreamingOptions has method setStreaming, maybe you 
> >>>>>>> can specify “—streaming true” on the command line to set that flag.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Aljoscha
> >>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy 
> >>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
> >>>>>>>>
> >>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both 
> >>>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, 
> >>>>>>>> though the latter gets further. I hope this helps, here is the 
> >>>>>>>> output of both:
> >>>>>>>>
> >>>>>>>> $ flink run -c 
> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
> >>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
> >>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
> >>>>>>>> --outputTopic=test_out
> >>>>>>>>
> >>>>>>>> ------------------------------------------------------------
> >>>>>>>> The program finished with the following exception:
> >>>>>>>>
> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
> >>>>>>>> method caused an error.
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>>>>>>>    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>>>>>>>    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
> >>>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners 
> >>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
> >>>>>>>> DirectPipelineRunner]
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
> >>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>>>>>    at 
> >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>>>>>>    at 
> >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>>>>>>>    ... 6 more
> >>>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
> >>>>>>>>    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>>>>>>>    at java.lang.Class.forName0(Native Method)
> >>>>>>>>    at java.lang.Class.forName(Class.java:264)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
> >>>>>>>>    ... 14 more
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> $ flink run -c 
> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
> >>>>>>>> target/beam-1.0-SNAPSHOT.jar 
> >>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
> >>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
> >>>>>>>> --outputTopic=test_out
> >>>>>>>>
> >>>>>>>> ------------------------------------------------------------
> >>>>>>>> The program finished with the following exception:
> >>>>>>>>
> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
> >>>>>>>> method caused an error.
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>>>>>>>    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>>>>>>>    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>>>>>>> Caused by: java.lang.UnsupportedOperationException: The transform 
> >>>>>>>> Read(UnboundedKafkaSource) is currently not supported.
> >>>>>>>>    at 
> >>>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
> >>>>>>>>    at 
> >>>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
> >>>>>>>>    at 
> >>>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
> >>>>>>>>    at 
> >>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
> >>>>>>>>    at 
> >>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
> >>>>>>>>    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
> >>>>>>>>    at 
> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
> >>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>>>>>    at 
> >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>>>>>>    at 
> >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
> >>>>>>>>    at 
> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>>>>>>>    ... 6 more
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected] 
> >>>>>>>>> <mailto:[email protected]>> wrote:
> >>>>>>>>>
> >>>>>>>>> Thanks Max.
> >>>>>>>>>
> >>>>>>>>> Bill McCarthy,
> >>>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please try 
> >>>>>>>>> KafkaIO source from my branch with Flink runner if you get a chance.
> >>>>>>>>>
> >>>>>>>>> thanks,
> >>>>>>>>> Raghu.
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré 
> >>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
> >>>>>>>>> Thanks for the update Max !
> >>>>>>>>>
> >>>>>>>>> Regards
> >>>>>>>>> JB
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
> >>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner
> >>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the
> >>>>>>>>> PipelineRunner class in case it has not been registered [2].
> >>>>>>>>>
> >>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40 
> >>>>>>>>> <https://github.com/apache/incubator-beam/pull/40>
> >>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61 
> >>>>>>>>> <https://github.com/apache/incubator-beam/pull/61>
> >>>>>>>>>
> >>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels 
> >>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
> >>>>>>>>> Great to see such a lively discussion here.
> >>>>>>>>>
> >>>>>>>>> I think we'll support sinks through the Write interface (like in
> >>>>>>>>> batched execution) and also have a dedicated wrapper for the Flink
> >>>>>>>>> sinks. This is a very pressing but easy to solve issue of the Flink
> >>>>>>>>> runner. Expect it to be in next week.
> >>>>>>>>>
> >>>>>>>>> Also, the proper registration of the runner is about to to be 
> >>>>>>>>> merged.
> >>>>>>>>> We just need an ok from the contributor to merge the changes.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Max
> >>>>>>>>>
> >>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected] 
> >>>>>>>>> <mailto:[email protected]>> wrote:
> >>>>>>>>> Thanks Bill!
> >>>>>>>>>
> >>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136 
> >>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-136>, but I'm glad it's 
> >>>>>>>>> not
> >>>>>>>>> blocking you!
> >>>>>>>>>
> >>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a 
> >>>>>>>>> blocker for
> >>>>>>>>> me, given that I have my example running, but for your information 
> >>>>>>>>> the
> >>>>>>>>> exception is below.
> >>>>>>>>>
> >>>>>>>>> I’ll watch the commit log on the beam incubator and look forward to
> >>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger to 
> >>>>>>>>> master.
> >>>>>>>>>
> >>>>>>>>> Thanks again for everyone’s help,
> >>>>>>>>>
> >>>>>>>>> Bill
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Command followed by exception:
> >>>>>>>>>
> >>>>>>>>> $ flink run -c
> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar
> >>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
> >>>>>>>>> --outputTopic=test_out
> >>>>>>>>>
> >>>>>>>>> ------------------------------------------------------------
> >>>>>>>>> The program finished with the following exception:
> >>>>>>>>>
> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
> >>>>>>>>> method caused an error.
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>>>>>>>> at 
> >>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
> >>>>>>>>> specified
> >>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported 
> >>>>>>>>> pipeline
> >>>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
> >>>>>>>>> DirectPipelineRunner]
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>>>>>> at
> >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>>>>>>> at
> >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>>>>>>>> ... 6 more
> >>>>>>>>>
> >>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected] 
> >>>>>>>>> <mailto:[email protected]>> wrote:
> >>>>>>>>>
> >>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same way 
> >>>>>>>>> the
> >>>>>>>>> Dataflow & Direct Pipeline runners are registered; using
> >>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
> >>>>>>>>>
> >>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Thanks Dan,
> >>>>>>>>>
> >>>>>>>>> I tried that, but getting the below. Note that the jar contains the
> >>>>>>>>> FlinkPipelineRunner.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
> >>>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
> >>>>>>>>>
> >>>>>>>>> % flink run -c
> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
> >>>>>>>>> --outputTopic=test_out
> >>>>>>>>>
> >>>>>>>>> ------------------------------------------------------------
> >>>>>>>>> The program finished with the following exception:
> >>>>>>>>>
> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
> >>>>>>>>> method caused an error.
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>>>>>>>> at 
> >>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
> >>>>>>>>> specified
> >>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners
> >>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
> >>>>>>>>> DirectPipelineRunner]
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
> >>>>>>>>> at
> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>>>>>> at
> >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>>>>>>> at
> >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>>>>>>>> at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>>>>>>>> ... 6 more
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected] 
> >>>>>>>>> <mailto:[email protected]>> wrote:
> >>>>>>>>>
> >>>>>>>>> Thanks for catching that, Aljoscha!
> >>>>>>>>>
> >>>>>>>>> Note that the Flink runner should be available via a command-line 
> >>>>>>>>> option
> >>>>>>>>> as well: --runner=FlinkPipelineRunner.
> >>>>>>>>>
> >>>>>>>>> The list of valid values for that flag is computed by walking the
> >>>>>>>>> classpath at runtime, so as long as the Flink jar is present it'll 
> >>>>>>>>> work.
> >>>>>>>>>
> >>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek 
> >>>>>>>>> <[email protected] <mailto:[email protected]>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>> looks like the example is being executed with the 
> >>>>>>>>> DirectPipelineRunner
> >>>>>>>>> which does not seem to be able to cope with UnboundedSource. You 
> >>>>>>>>> need to set
> >>>>>>>>> the runner to the FlinkRunner in the example code as described here:
> >>>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
> >>>>>>>>>  
> >>>>>>>>> <https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example>
> >>>>>>>>>
> >>>>>>>>> The Flink runner should be able to deal with UnboundedSource but 
> >>>>>>>>> has the
> >>>>>>>>> limitation that sources are always parallelism=1 (this is being 
> >>>>>>>>> worked on,
> >>>>>>>>> however).
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Aljoscha
> >>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected] 
> >>>>>>>>> <mailto:[email protected]>> wrote:
> >>>>>>>>>
> >>>>>>>>> Looks like the Flink runner may not yet support arbitrary code 
> >>>>>>>>> written
> >>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink 
> >>>>>>>>> runner
> >>>>>>>>> expects the sources to get translated away.
> >>>>>>>>>
> >>>>>>>>> Max?
> >>>>>>>>>
> >>>>>>>>> Dan
> >>>>>>>>>
> >>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
> >>>>>>>>> wrote:
> >>>>>>>>> Thanks Raghu,
> >>>>>>>>>
> >>>>>>>>> When I try to run it on flink using the incubator-beam code, i.e.
> >>>>>>>>>
> >>>>>>>>> flink run -c
> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
> >>>>>>>>> --topics=test_in --outputTopic=test_out
> >>>>>>>>>
> >>>>>>>>> I get this:
> >>>>>>>>>
> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
> >>>>>>>>> method caused an error.
> >>>>>>>>>     at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> >>>>>>>>>     at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>>>>>>>>     at
> >>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>>>>>>>>     at
> >>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>>>>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>>>>>>>>     at
> >>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>>>>>>>>     at
> >>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
> >>>>>>>>> for Read(UnboundedKafkaSource)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
> >>>>>>>>>     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
> >>>>>>>>>     at
> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
> >>>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>>>>>>     at
> >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>>>>>>>     at
> >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
> >>>>>>>>>     at
> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>>>>>>>>     ... 6 more
> >>>>>>>>>
> >>>>>>>>> Any ideas?
> >>>>>>>>>
> >>>>>>>>> Bill
> >>>>>>>>>
> >>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected] 
> >>>>>>>>> <mailto:[email protected]>> wrote:
> >>>>>>>>>
> >>>>>>>>> Thanks for trying it.
> >>>>>>>>>
> >>>>>>>>> I fixed the CheckStyle error  (not sure why my build is not 
> >>>>>>>>> failing).
> >>>>>>>>> Let me know if you see any issues running with Beam. I haven't 
> >>>>>>>>> tried it. I
> >>>>>>>>> should. In fact Daniel Halperin says my patch should be against 
> >>>>>>>>> Beam..
> >>>>>>>>>
> >>>>>>>>> Raghu.
> >>>>>>>>>
> >>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
> >>>>>>>>> wrote:
> >>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
> >>>>>>>>> pointing me to working code.
> >>>>>>>>>
> >>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your
> >>>>>>>>> responses has been very welcome.
> >>>>>>>>>
> >>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
> >>>>>>>>> cloned your repo, switched to the kafka branch and built both 
> >>>>>>>>> contrib/kafka
> >>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with 
> >>>>>>>>> a
> >>>>>>>>> CheckStyle error
> >>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
> >>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve 
> >>>>>>>>> fixed that
> >>>>>>>>> in my local clone and now it’s building fine. I hope to be able to 
> >>>>>>>>> run your
> >>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which will 
> >>>>>>>>> be what
> >>>>>>>>> I attempt to do now.
> >>>>>>>>>
> >>>>>>>>> Thanks again to all, for your swift help.
> >>>>>>>>>
> >>>>>>>>> Bill
> >>>>>>>>>
> >>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected] 
> >>>>>>>>> <mailto:[email protected]>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Bill,
> >>>>>>>>>
> >>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
> >>>>>>>>> merged soon. The example there keeps track of top hashtags in 10 
> >>>>>>>>> minute
> >>>>>>>>> sliding window and writes the results to another Kafka topic. 
> >>>>>>>>> Please try it
> >>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not 
> >>>>>>>>> run it
> >>>>>>>>> using Flink runner.
> >>>>>>>>>
> >>>>>>>>> Raghu.
> >>>>>>>>>
> >>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
> >>>>>>>>> wrote:
> >>>>>>>>> Hello Bill,
> >>>>>>>>>
> >>>>>>>>> This is a known limitation of the Flink Runner.
> >>>>>>>>> There is a JIRA issue for that
> >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127 
> >>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-127>
> >>>>>>>>>
> >>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
> >>>>>>>>> a more Beam-y solution will come as well.
> >>>>>>>>>
> >>>>>>>>> Kostas
> >>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I’m trying to write a proof-of-concept which takes messages from
> >>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the results 
> >>>>>>>>> onto a
> >>>>>>>>> different Kafka topic.
> >>>>>>>>>
> >>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
> >>>>>>>>> and that’s doing the first part of what I want to do, but it 
> >>>>>>>>> outputs to text
> >>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, 
> >>>>>>>>> but I can’t
> >>>>>>>>> figure out how to plug it into the pipeline. I was thinking that it 
> >>>>>>>>> would be
> >>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t 
> >>>>>>>>> seem to
> >>>>>>>>> exist.
> >>>>>>>>>
> >>>>>>>>> Any advice or thoughts on what I’m trying to do?
> >>>>>>>>>
> >>>>>>>>> I’m running the latest incubator-beam (as of last night from
> >>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on 
> >>>>>>>>> Google
> >>>>>>>>> Compute Engine (Debian Jessie).
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Bill McCarthy
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Jean-Baptiste Onofré
> >>>>>>>>> [email protected] <mailto:[email protected]>
> >>>>>>>>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> >>>>>>>>> Talend - http://www.talend.com <http://www.talend.com/>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> 
> 

Reply via email to