I’m cannot send the logs due to confidentiality constraints.

However, when I look at the taskmanager log, I see some JSON parsing issues. So 
I suspect that the tweets I’m sending through (borrowed from here, with an 
added hashtag: https://gist.github.com/hrp/900964 
<https://gist.github.com/hrp/900964> ) do not conform to the schema you’re 
expecting.

You sent through kafka topic information for your tweet stream out of band, and 
I’m connected to that now. It appears that now the output is getting printed to 
the taskmanager log file, which is what’s expected. So I think that this issue 
is now explained.

Thanks,

Bill


> On Mar 23, 2016, at 6:33 PM, Raghu Angadi <[email protected]> wrote:
> 
> It should work. Do you have log from one of the workers by any chance? I will 
> send you another kafka endpoint you could try running against. 
> 
> Raghu.
> 
> On Wed, Mar 23, 2016 at 3:12 PM, William McCarthy <[email protected] 
> <mailto:[email protected]>> wrote:
> Raghu,
> 
> Sorry, I may have spoken too soon when I said that it “worked”.
> 
> The code did not throw any exceptions, and seemed to start up happily. I then 
> tried injecting a number of “tweets” (with hashtags) into my test_in kafka 
> topic. I’ve been waiting for more than 10 mins, now, and I see nothing on my 
> test_out kafka topic. I expected to see the top hashtags to show up there 
> within a 10 minute window.
> 
> Am I misunderstanding something?
> 
> To help debug the issue: Previously, I noticed that if I injected a random 
> string into the test_in topic, the Beam job fell over with a Jackson 
> JsonMappingException, so it would appear that the job is getting my messages. 
> But when I restarted the Beam job and inject 3 correctly formed tweet 
> message, something is silently being dropped between there and the output 
> kafka topic.
> 
> Bill
> 
>> On Mar 23, 2016, at 5:53 PM, Raghu Angadi <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> great news! Thanks for trying multiple fixes. and thanks for Max and 
>> Aljoscha multiple fixes.
>> 
>> Raghu
>> 
>> On Wed, Mar 23, 2016 at 2:51 PM, William McCarthy <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Thanks Max,
>> 
>> This command now works, thanks:
>> 
>> $ flink run -c 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>> target/beam-1.0-SNAPSHOT.jar 
>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out 
>> --streaming=true
>> 
>> Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e. without 
>> the package scope). That behaviour is unexpected to you, given the test that 
>> you put in a few days ago, right? i.e.
>> 
>> $ flink run -c 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>> 
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error.
>>         at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>         at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>         at 
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>         at 
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>         at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified 
>> 'FlinkPipelineRunner', supported pipeline runners 
>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>> DirectPipelineRunner]
>>         at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>         at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>         at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>         at 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>         ... 6 more
>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>         at java.lang.Class.forName0(Native Method)
>>         at java.lang.Class.forName(Class.java:264)
>>         at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>         ... 14 more
>> 
>> 
>> > On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected] 
>> > <mailto:[email protected]>> wrote:
>> >
>> > Hi William,
>> >
>> > It has been merged. Feel free to try again.
>> >
>> > Cheers,
>> > Max
>> >
>> > On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected] 
>> > <mailto:[email protected]>> wrote:
>> >> The pull request is here: 
>> >> https://github.com/apache/incubator-beam/pull/69 
>> >> <https://github.com/apache/incubator-beam/pull/69>
>> >>
>> >> I'll merge it after the tests have passed and it has been reviewed.
>> >>
>> >> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected] 
>> >> <mailto:[email protected]>> wrote:
>> >>> Hi William,
>> >>>
>> >>> I started working on a fix and better testing of the
>> >>> UnboundedSourceWrapper. I'll get back to you shortly with a pull
>> >>> request that we should be able to merge soon.
>> >>>
>> >>> - Max
>> >>>
>> >>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected] 
>> >>> <mailto:[email protected]>> wrote:
>> >>>> Hi,
>> >>>> as far as I can see from a quick glance the problem is that 
>> >>>> UnboundedSourceWrapper stores an instance of Reader that it gets from 
>> >>>> the Source. The Reader is not Serializable while the UnboundedSource 
>> >>>> is. I think the Reader should be initialized when actually running the 
>> >>>> source, in the run() method.
>> >>>>
>> >>>> Cheers,
>> >>>> Aljoscha
>> >>>>> On 23 Mar 2016, at 13:07, William McCarthy <[email protected] 
>> >>>>> <mailto:[email protected]>> wrote:
>> >>>>>
>> >>>>> The first stack trace was from the latest runner, yes. I’ve pulled the 
>> >>>>> very latest, just now, and still get the same thing.
>> >>>>>
>> >>>>> When I write ‘pulled the very latest’, here’s what I mean (all of the 
>> >>>>> following commands, except the latest, finished with success):
>> >>>>>
>> >>>>> $ cd incubator-beam
>> >>>>> $ git pull
>> >>>>> …some output, then success
>> >>>>> $ git branch
>> >>>>> * master
>> >>>>> $ mvn -DskipTests clean install
>> >>>>> …some output, then success
>> >>>>> $ cd <my project>
>> >>>>> $ mvn -DskipTests clean install
>> >>>>> …some output, then success
>> >>>>> $ <command as output below>
>> >>>>>
>> >>>>> Bill
>> >>>>>
>> >>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected] 
>> >>>>>> <mailto:[email protected]>> wrote:
>> >>>>>>
>> >>>>>> Hi William,
>> >>>>>>
>> >>>>>> Is the first stack trace from the latest master? Using only the simple
>> >>>>>> class name of the Runner should actually work now. I've also added a
>> >>>>>> test to explicitly test that.
>> >>>>>>
>> >>>>>> The latter error, as Aljoscha pointed out, this due to batch
>> >>>>>> execution. Perhaps we could make it explicit during startup which mode
>> >>>>>> we're executing in. File a JIRA issue for that:
>> >>>>>> https://issues.apache.org/jira/browse/BEAM-139 
>> >>>>>> <https://issues.apache.org/jira/browse/BEAM-139>
>> >>>>>>
>> >>>>>> - Max
>> >>>>>>
>> >>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek 
>> >>>>>> <[email protected] <mailto:[email protected]>> wrote:
>> >>>>>>> Hi,
>> >>>>>>> for Flink the runner can internally either translate to a batch job 
>> >>>>>>> or a streaming job. Unbounded sources are not supported when running 
>> >>>>>>> in batch mode so you have to somehow specify that you want to have 
>> >>>>>>> streaming mode. StreamingOptions has method setStreaming, maybe you 
>> >>>>>>> can specify “—streaming true” on the command line to set that flag.
>> >>>>>>>
>> >>>>>>> Cheers,
>> >>>>>>> Aljoscha
>> >>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy 
>> >>>>>>>> <[email protected] <mailto:[email protected]>> 
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both 
>> >>>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, 
>> >>>>>>>> though the latter gets further. I hope this helps, here is the 
>> >>>>>>>> output of both:
>> >>>>>>>>
>> >>>>>>>> $ flink run -c 
>> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>> >>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>> >>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>> >>>>>>>> --outputTopic=test_out
>> >>>>>>>>
>> >>>>>>>> ------------------------------------------------------------
>> >>>>>>>> The program finished with the following exception:
>> >>>>>>>>
>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The 
>> >>>>>>>> main method caused an error.
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>> >>>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners 
>> >>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>> >>>>>>>> DirectPipelineRunner]
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>> >>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>    at 
>> >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>    at 
>> >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>    ... 6 more
>> >>>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>> >>>>>>>>    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> >>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> >>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> >>>>>>>>    at java.lang.Class.forName0(Native Method)
>> >>>>>>>>    at java.lang.Class.forName(Class.java:264)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>> >>>>>>>>    ... 14 more
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> $ flink run -c 
>> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>> >>>>>>>> target/beam-1.0-SNAPSHOT.jar 
>> >>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>> >>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>> >>>>>>>> --outputTopic=test_out
>> >>>>>>>>
>> >>>>>>>> ------------------------------------------------------------
>> >>>>>>>> The program finished with the following exception:
>> >>>>>>>>
>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The 
>> >>>>>>>> main method caused an error.
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>> Caused by: java.lang.UnsupportedOperationException: The transform 
>> >>>>>>>> Read(UnboundedKafkaSource) is currently not supported.
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>> >>>>>>>>    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>> >>>>>>>>    at 
>> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>> >>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>    at 
>> >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>    at 
>> >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>    at 
>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>    ... 6 more
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected] 
>> >>>>>>>>> <mailto:[email protected]>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks Max.
>> >>>>>>>>>
>> >>>>>>>>> Bill McCarthy,
>> >>>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please 
>> >>>>>>>>> try KafkaIO source from my branch with Flink runner if you get a 
>> >>>>>>>>> chance.
>> >>>>>>>>>
>> >>>>>>>>> thanks,
>> >>>>>>>>> Raghu.
>> >>>>>>>>>
>> >>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré 
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
>> >>>>>>>>> Thanks for the update Max !
>> >>>>>>>>>
>> >>>>>>>>> Regards
>> >>>>>>>>> JB
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>> >>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner
>> >>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the
>> >>>>>>>>> PipelineRunner class in case it has not been registered [2].
>> >>>>>>>>>
>> >>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40 
>> >>>>>>>>> <https://github.com/apache/incubator-beam/pull/40>
>> >>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61 
>> >>>>>>>>> <https://github.com/apache/incubator-beam/pull/61>
>> >>>>>>>>>
>> >>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels 
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
>> >>>>>>>>> Great to see such a lively discussion here.
>> >>>>>>>>>
>> >>>>>>>>> I think we'll support sinks through the Write interface (like in
>> >>>>>>>>> batched execution) and also have a dedicated wrapper for the Flink
>> >>>>>>>>> sinks. This is a very pressing but easy to solve issue of the Flink
>> >>>>>>>>> runner. Expect it to be in next week.
>> >>>>>>>>>
>> >>>>>>>>> Also, the proper registration of the runner is about to to be 
>> >>>>>>>>> merged.
>> >>>>>>>>> We just need an ok from the contributor to merge the changes.
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Max
>> >>>>>>>>>
>> >>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin 
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
>> >>>>>>>>> Thanks Bill!
>> >>>>>>>>>
>> >>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136 
>> >>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-136>, but I'm glad 
>> >>>>>>>>> it's not
>> >>>>>>>>> blocking you!
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a 
>> >>>>>>>>> blocker for
>> >>>>>>>>> me, given that I have my example running, but for your information 
>> >>>>>>>>> the
>> >>>>>>>>> exception is below.
>> >>>>>>>>>
>> >>>>>>>>> I’ll watch the commit log on the beam incubator and look forward to
>> >>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger to 
>> >>>>>>>>> master.
>> >>>>>>>>>
>> >>>>>>>>> Thanks again for everyone’s help,
>> >>>>>>>>>
>> >>>>>>>>> Bill
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Command followed by exception:
>> >>>>>>>>>
>> >>>>>>>>> $ flink run -c
>> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar
>> >>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>> >>>>>>>>> --outputTopic=test_out
>> >>>>>>>>>
>> >>>>>>>>> ------------------------------------------------------------
>> >>>>>>>>> The program finished with the following exception:
>> >>>>>>>>>
>> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The 
>> >>>>>>>>> main
>> >>>>>>>>> method caused an error.
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>> at 
>> >>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>> >>>>>>>>> specified
>> >>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported 
>> >>>>>>>>> pipeline
>> >>>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>> >>>>>>>>> DirectPipelineRunner]
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>> at
>> >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>> at
>> >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>> ... 6 more
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected] 
>> >>>>>>>>> <mailto:[email protected]>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same way 
>> >>>>>>>>> the
>> >>>>>>>>> Dataflow & Direct Pipeline runners are registered; using
>> >>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks Dan,
>> >>>>>>>>>
>> >>>>>>>>> I tried that, but getting the below. Note that the jar contains the
>> >>>>>>>>> FlinkPipelineRunner.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>> >>>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>> >>>>>>>>>
>> >>>>>>>>> % flink run -c
>> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>> >>>>>>>>> --outputTopic=test_out
>> >>>>>>>>>
>> >>>>>>>>> ------------------------------------------------------------
>> >>>>>>>>> The program finished with the following exception:
>> >>>>>>>>>
>> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The 
>> >>>>>>>>> main
>> >>>>>>>>> method caused an error.
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>> at 
>> >>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>> >>>>>>>>> specified
>> >>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>> >>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>> >>>>>>>>> DirectPipelineRunner]
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>> >>>>>>>>> at
>> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>> at
>> >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>> at
>> >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>> at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>> ... 6 more
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected] 
>> >>>>>>>>> <mailto:[email protected]>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks for catching that, Aljoscha!
>> >>>>>>>>>
>> >>>>>>>>> Note that the Flink runner should be available via a command-line 
>> >>>>>>>>> option
>> >>>>>>>>> as well: --runner=FlinkPipelineRunner.
>> >>>>>>>>>
>> >>>>>>>>> The list of valid values for that flag is computed by walking the
>> >>>>>>>>> classpath at runtime, so as long as the Flink jar is present it'll 
>> >>>>>>>>> work.
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek 
>> >>>>>>>>> <[email protected] <mailto:[email protected]>>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi,
>> >>>>>>>>> looks like the example is being executed with the 
>> >>>>>>>>> DirectPipelineRunner
>> >>>>>>>>> which does not seem to be able to cope with UnboundedSource. You 
>> >>>>>>>>> need to set
>> >>>>>>>>> the runner to the FlinkRunner in the example code as described 
>> >>>>>>>>> here:
>> >>>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>> >>>>>>>>>  
>> >>>>>>>>> <https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example>
>> >>>>>>>>>
>> >>>>>>>>> The Flink runner should be able to deal with UnboundedSource but 
>> >>>>>>>>> has the
>> >>>>>>>>> limitation that sources are always parallelism=1 (this is being 
>> >>>>>>>>> worked on,
>> >>>>>>>>> however).
>> >>>>>>>>>
>> >>>>>>>>> Cheers,
>> >>>>>>>>> Aljoscha
>> >>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected] 
>> >>>>>>>>> <mailto:[email protected]>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Looks like the Flink runner may not yet support arbitrary code 
>> >>>>>>>>> written
>> >>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink 
>> >>>>>>>>> runner
>> >>>>>>>>> expects the sources to get translated away.
>> >>>>>>>>>
>> >>>>>>>>> Max?
>> >>>>>>>>>
>> >>>>>>>>> Dan
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
>> >>>>>>>>> wrote:
>> >>>>>>>>> Thanks Raghu,
>> >>>>>>>>>
>> >>>>>>>>> When I try to run it on flink using the incubator-beam code, i.e.
>> >>>>>>>>>
>> >>>>>>>>> flink run -c
>> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>> >>>>>>>>> --topics=test_in --outputTopic=test_out
>> >>>>>>>>>
>> >>>>>>>>> I get this:
>> >>>>>>>>>
>> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The 
>> >>>>>>>>> main
>> >>>>>>>>> method caused an error.
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>>     at 
>> >>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>> >>>>>>>>> for Read(UnboundedKafkaSource)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>> >>>>>>>>>     at 
>> >>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>> >>>>>>>>>     at
>> >>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>> >>>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>>     at
>> >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>>     at
>> >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>>     ... 6 more
>> >>>>>>>>>
>> >>>>>>>>> Any ideas?
>> >>>>>>>>>
>> >>>>>>>>> Bill
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected] 
>> >>>>>>>>> <mailto:[email protected]>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks for trying it.
>> >>>>>>>>>
>> >>>>>>>>> I fixed the CheckStyle error  (not sure why my build is not 
>> >>>>>>>>> failing).
>> >>>>>>>>> Let me know if you see any issues running with Beam. I haven't 
>> >>>>>>>>> tried it. I
>> >>>>>>>>> should. In fact Daniel Halperin says my patch should be against 
>> >>>>>>>>> Beam..
>> >>>>>>>>>
>> >>>>>>>>> Raghu.
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
>> >>>>>>>>> wrote:
>> >>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu 
>> >>>>>>>>> for
>> >>>>>>>>> pointing me to working code.
>> >>>>>>>>>
>> >>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your
>> >>>>>>>>> responses has been very welcome.
>> >>>>>>>>>
>> >>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>> >>>>>>>>> cloned your repo, switched to the kafka branch and built both 
>> >>>>>>>>> contrib/kafka
>> >>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed 
>> >>>>>>>>> with a
>> >>>>>>>>> CheckStyle error
>> >>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>> >>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve 
>> >>>>>>>>> fixed that
>> >>>>>>>>> in my local clone and now it’s building fine. I hope to be able to 
>> >>>>>>>>> run your
>> >>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which 
>> >>>>>>>>> will be what
>> >>>>>>>>> I attempt to do now.
>> >>>>>>>>>
>> >>>>>>>>> Thanks again to all, for your swift help.
>> >>>>>>>>>
>> >>>>>>>>> Bill
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected] 
>> >>>>>>>>> <mailto:[email protected]>>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi Bill,
>> >>>>>>>>>
>> >>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>> >>>>>>>>> merged soon. The example there keeps track of top hashtags in 10 
>> >>>>>>>>> minute
>> >>>>>>>>> sliding window and writes the results to another Kafka topic. 
>> >>>>>>>>> Please try it
>> >>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not 
>> >>>>>>>>> run it
>> >>>>>>>>> using Flink runner.
>> >>>>>>>>>
>> >>>>>>>>> Raghu.
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
>> >>>>>>>>> wrote:
>> >>>>>>>>> Hello Bill,
>> >>>>>>>>>
>> >>>>>>>>> This is a known limitation of the Flink Runner.
>> >>>>>>>>> There is a JIRA issue for that
>> >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127 
>> >>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-127>
>> >>>>>>>>>
>> >>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>> >>>>>>>>> a more Beam-y solution will come as well.
>> >>>>>>>>>
>> >>>>>>>>> Kostas
>> >>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>> >>>>>>>>> <[email protected] <mailto:[email protected]>> 
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi,
>> >>>>>>>>>
>> >>>>>>>>> I’m trying to write a proof-of-concept which takes messages from
>> >>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the 
>> >>>>>>>>> results onto a
>> >>>>>>>>> different Kafka topic.
>> >>>>>>>>>
>> >>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>> >>>>>>>>> and that’s doing the first part of what I want to do, but it 
>> >>>>>>>>> outputs to text
>> >>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, 
>> >>>>>>>>> but I can’t
>> >>>>>>>>> figure out how to plug it into the pipeline. I was thinking that 
>> >>>>>>>>> it would be
>> >>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t 
>> >>>>>>>>> seem to
>> >>>>>>>>> exist.
>> >>>>>>>>>
>> >>>>>>>>> Any advice or thoughts on what I’m trying to do?
>> >>>>>>>>>
>> >>>>>>>>> I’m running the latest incubator-beam (as of last night from
>> >>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on 
>> >>>>>>>>> Google
>> >>>>>>>>> Compute Engine (Debian Jessie).
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>>
>> >>>>>>>>> Bill McCarthy
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> --
>> >>>>>>>>> Jean-Baptiste Onofré
>> >>>>>>>>> [email protected] <mailto:[email protected]>
>> >>>>>>>>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>> >>>>>>>>> Talend - http://www.talend.com <http://www.talend.com/>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> 
>> 
> 
> 

Reply via email to