Thanks Max. I'm happy to move forward with the fully qualified class name for 
the runner. 

Bill

> On Mar 24, 2016, at 5:19 AM, Maximilian Michels <[email protected]> wrote:
> 
> Cool. Great to hear it is up and running :)
> 
> @William: I suspect the automatic detection doesn't work because the
> registered runners which are put in a META-INF file are not copied
> over to your Jar. The fully-qualified class name always works because
> the Beam code performs a lookup of the class in your Jar. For the
> simple name, it needs to know the available runners in advance. I'll
> check if there is an easy way to make this work with your custom jar.
> 
> 
> 
>> On Thu, Mar 24, 2016 at 1:53 AM, Raghu Angadi <[email protected]> wrote:
>> Good to hear that.
>> 
>> oh, that json is 5 years old. But it should not cause parsing errors. Might
>> be some string escaping issue with the generator. Note that my app tolerates
>> missing fields (wrong schema), but does not catch json parsing errors.
>> 
>> Raghu.
>> 
>> 
>> On Wed, Mar 23, 2016 at 5:45 PM, William McCarthy
>> <[email protected]> wrote:
>>> 
>>> I’m cannot send the logs due to confidentiality constraints.
>>> 
>>> However, when I look at the taskmanager log, I see some JSON parsing
>>> issues. So I suspect that the tweets I’m sending through (borrowed from
>>> here, with an added hashtag: https://gist.github.com/hrp/900964 ) do not
>>> conform to the schema you’re expecting.
>>> 
>>> You sent through kafka topic information for your tweet stream out of
>>> band, and I’m connected to that now. It appears that now the output is
>>> getting printed to the taskmanager log file, which is what’s expected. So I
>>> think that this issue is now explained.
>>> 
>>> Thanks,
>>> 
>>> Bill
>>> 
>>> 
>>> On Mar 23, 2016, at 6:33 PM, Raghu Angadi <[email protected]> wrote:
>>> 
>>> It should work. Do you have log from one of the workers by any chance? I
>>> will send you another kafka endpoint you could try running against.
>>> 
>>> Raghu.
>>> 
>>> On Wed, Mar 23, 2016 at 3:12 PM, William McCarthy
>>> <[email protected]> wrote:
>>>> 
>>>> Raghu,
>>>> 
>>>> Sorry, I may have spoken too soon when I said that it “worked”.
>>>> 
>>>> The code did not throw any exceptions, and seemed to start up happily. I
>>>> then tried injecting a number of “tweets” (with hashtags) into my test_in
>>>> kafka topic. I’ve been waiting for more than 10 mins, now, and I see 
>>>> nothing
>>>> on my test_out kafka topic. I expected to see the top hashtags to show up
>>>> there within a 10 minute window.
>>>> 
>>>> Am I misunderstanding something?
>>>> 
>>>> To help debug the issue: Previously, I noticed that if I injected a
>>>> random string into the test_in topic, the Beam job fell over with a Jackson
>>>> JsonMappingException, so it would appear that the job is getting my
>>>> messages. But when I restarted the Beam job and inject 3 correctly formed
>>>> tweet message, something is silently being dropped between there and the
>>>> output kafka topic.
>>>> 
>>>> Bill
>>>> 
>>>> On Mar 23, 2016, at 5:53 PM, Raghu Angadi <[email protected]> wrote:
>>>> 
>>>> great news! Thanks for trying multiple fixes. and thanks for Max and
>>>> Aljoscha multiple fixes.
>>>> 
>>>> Raghu
>>>> 
>>>> On Wed, Mar 23, 2016 at 2:51 PM, William McCarthy
>>>> <[email protected]> wrote:
>>>>> 
>>>>> Thanks Max,
>>>>> 
>>>>> This command now works, thanks:
>>>>> 
>>>>> $ flink run -c
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>> --streaming=true
>>>>> 
>>>>> Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e.
>>>>> without the package scope). That behaviour is unexpected to you, given the
>>>>> test that you put in a few days ago, right? i.e.
>>>>> 
>>>>> $ flink run -c
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>> 
>>>>> ------------------------------------------------------------
>>>>> The program finished with the following exception:
>>>>> 
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>>        at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>        at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>        at
>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>        at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>        at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>        at
>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner'
>>>>> specified 'FlinkPipelineRunner', supported pipeline runners
>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>> DirectPipelineRunner]
>>>>>        at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>>>>        at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>>>>        at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>>>>        at
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>        at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>        at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>        at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>        ... 6 more
>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>>>>        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>        at java.lang.Class.forName0(Native Method)
>>>>>        at java.lang.Class.forName(Class.java:264)
>>>>>        at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>>>>        ... 14 more
>>>>> 
>>>>> 
>>>>>> On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected]>
>>>>>> wrote:
>>>>>> 
>>>>>> Hi William,
>>>>>> 
>>>>>> It has been merged. Feel free to try again.
>>>>>> 
>>>>>> Cheers,
>>>>>> Max
>>>>>> 
>>>>>> On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]>
>>>>>> wrote:
>>>>>>> The pull request is here:
>>>>>>> https://github.com/apache/incubator-beam/pull/69
>>>>>>> 
>>>>>>> I'll merge it after the tests have passed and it has been reviewed.
>>>>>>> 
>>>>>>> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]>
>>>>>>> wrote:
>>>>>>>> Hi William,
>>>>>>>> 
>>>>>>>> I started working on a fix and better testing of the
>>>>>>>> UnboundedSourceWrapper. I'll get back to you shortly with a pull
>>>>>>>> request that we should be able to merge soon.
>>>>>>>> 
>>>>>>>> - Max
>>>>>>>> 
>>>>>>>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek
>>>>>>>> <[email protected]> wrote:
>>>>>>>>> Hi,
>>>>>>>>> as far as I can see from a quick glance the problem is that
>>>>>>>>> UnboundedSourceWrapper stores an instance of Reader that it gets from 
>>>>>>>>> the
>>>>>>>>> Source. The Reader is not Serializable while the UnboundedSource is. 
>>>>>>>>> I think
>>>>>>>>> the Reader should be initialized when actually running the source, in 
>>>>>>>>> the
>>>>>>>>> run() method.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 23 Mar 2016, at 13:07, William McCarthy
>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>> 
>>>>>>>>>> The first stack trace was from the latest runner, yes. I’ve pulled
>>>>>>>>>> the very latest, just now, and still get the same thing.
>>>>>>>>>> 
>>>>>>>>>> When I write ‘pulled the very latest’, here’s what I mean (all of
>>>>>>>>>> the following commands, except the latest, finished with success):
>>>>>>>>>> 
>>>>>>>>>> $ cd incubator-beam
>>>>>>>>>> $ git pull
>>>>>>>>>> …some output, then success
>>>>>>>>>> $ git branch
>>>>>>>>>> * master
>>>>>>>>>> $ mvn -DskipTests clean install
>>>>>>>>>> …some output, then success
>>>>>>>>>> $ cd <my project>
>>>>>>>>>> $ mvn -DskipTests clean install
>>>>>>>>>> …some output, then success
>>>>>>>>>> $ <command as output below>
>>>>>>>>>> 
>>>>>>>>>> Bill
>>>>>>>>>> 
>>>>>>>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi William,
>>>>>>>>>>> 
>>>>>>>>>>> Is the first stack trace from the latest master? Using only the
>>>>>>>>>>> simple
>>>>>>>>>>> class name of the Runner should actually work now. I've also
>>>>>>>>>>> added a
>>>>>>>>>>> test to explicitly test that.
>>>>>>>>>>> 
>>>>>>>>>>> The latter error, as Aljoscha pointed out, this due to batch
>>>>>>>>>>> execution. Perhaps we could make it explicit during startup which
>>>>>>>>>>> mode
>>>>>>>>>>> we're executing in. File a JIRA issue for that:
>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-139
>>>>>>>>>>> 
>>>>>>>>>>> - Max
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek
>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> for Flink the runner can internally either translate to a batch
>>>>>>>>>>>> job or a streaming job. Unbounded sources are not supported when 
>>>>>>>>>>>> running in
>>>>>>>>>>>> batch mode so you have to somehow specify that you want to have 
>>>>>>>>>>>> streaming
>>>>>>>>>>>> mode. StreamingOptions has method setStreaming, maybe you can 
>>>>>>>>>>>> specify
>>>>>>>>>>>> “—streaming true” on the command line to set that flag.
>>>>>>>>>>>> 
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy
>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both
>>>>>>>>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb 
>>>>>>>>>>>>> out, though
>>>>>>>>>>>>> the latter gets further. I hope this helps, here is the output of 
>>>>>>>>>>>>> both:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> $ flink run -c
>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>>>>>>>>>>>>> --outputTopic=test_out
>>>>>>>>>>>>> 
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>>>>>>>> main method caused an error.
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner'
>>>>>>>>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners
>>>>>>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>>>>>>>> DirectPipelineRunner]
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>>>>>>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>>>>>>>> Method)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>>>>>   at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>>>>>>   ... 6 more
>>>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>>>>> FlinkPipelineRunner
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>>>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>>>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>>>>>>>   at java.lang.Class.forName0(Native Method)
>>>>>>>>>>>>>   at java.lang.Class.forName(Class.java:264)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>>>>>>>>>>>>   ... 14 more
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> $ flink run -c
>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>>>>>>>>>>>>> --outputTopic=test_out
>>>>>>>>>>>>> 
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>>>>>>>> main method caused an error.
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The
>>>>>>>>>>>>> transform Read(UnboundedKafkaSource) is currently not supported.
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>>>>>>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>>>>>>>> Method)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>>>>>   at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>>>>   at
>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>>>>>>   ... 6 more
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks Max.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Bill McCarthy,
>>>>>>>>>>>>>> I know you are unblocked and KafkaWriter is good enough.
>>>>>>>>>>>>>> Please try KafkaIO source from my branch with Flink runner if 
>>>>>>>>>>>>>> you get a
>>>>>>>>>>>>>> chance.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> thanks,
>>>>>>>>>>>>>> Raghu.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> Thanks for the update Max !
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>>>>>>>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner
>>>>>>>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> PipelineRunner class in case it has not been registered [2].
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40
>>>>>>>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> Great to see such a lively discussion here.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I think we'll support sinks through the Write interface (like
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> batched execution) and also have a dedicated wrapper for the
>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>> sinks. This is a very pressing but easy to solve issue of the
>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>> runner. Expect it to be in next week.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Also, the proper registration of the runner is about to to be
>>>>>>>>>>>>>> merged.
>>>>>>>>>>>>>> We just need an ok from the contributor to merge the changes.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Max
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> Thanks Bill!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm
>>>>>>>>>>>>>> glad it's not
>>>>>>>>>>>>>> blocking you!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a
>>>>>>>>>>>>>> blocker for
>>>>>>>>>>>>>> me, given that I have my example running, but for your
>>>>>>>>>>>>>> information the
>>>>>>>>>>>>>> exception is below.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I’ll watch the commit log on the beam incubator and look
>>>>>>>>>>>>>> forward to
>>>>>>>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger
>>>>>>>>>>>>>> to master.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks again for everyone’s help,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Command followed by exception:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> $ flink run -c
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in
>>>>>>>>>>>>>> --outputTopic=test_out
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>>>>>>>>>>> The main
>>>>>>>>>>>>>> method caused an error.
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown
>>>>>>>>>>>>>> 'runner' specified
>>>>>>>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported
>>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>>> runners [BlockingDataflowPipelineRunner,
>>>>>>>>>>>>>> DataflowPipelineRunner,
>>>>>>>>>>>>>> DirectPipelineRunner]
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>>>>>>> ... 6 more
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same
>>>>>>>>>>>>>> way the
>>>>>>>>>>>>>> Dataflow & Direct Pipeline runners are registered; using
>>>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks Dan,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I tried that, but getting the below. Note that the jar
>>>>>>>>>>>>>> contains the
>>>>>>>>>>>>>> FlinkPipelineRunner.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>>>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>>>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> % flink run -c
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in
>>>>>>>>>>>>>> --outputTopic=test_out
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>>>>>>>>>>> The main
>>>>>>>>>>>>>> method caused an error.
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown
>>>>>>>>>>>>>> 'runner' specified
>>>>>>>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>>>>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>>>>>>>>> DirectPipelineRunner]
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>>>>>>> ... 6 more
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for catching that, Aljoscha!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Note that the Flink runner should be available via a
>>>>>>>>>>>>>> command-line option
>>>>>>>>>>>>>> as well: --runner=FlinkPipelineRunner.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The list of valid values for that flag is computed by walking
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> classpath at runtime, so as long as the Flink jar is present
>>>>>>>>>>>>>> it'll work.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek
>>>>>>>>>>>>>> <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> looks like the example is being executed with the
>>>>>>>>>>>>>> DirectPipelineRunner
>>>>>>>>>>>>>> which does not seem to be able to cope with UnboundedSource.
>>>>>>>>>>>>>> You need to set
>>>>>>>>>>>>>> the runner to the FlinkRunner in the example code as described
>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The Flink runner should be able to deal with UnboundedSource
>>>>>>>>>>>>>> but has the
>>>>>>>>>>>>>> limitation that sources are always parallelism=1 (this is
>>>>>>>>>>>>>> being worked on,
>>>>>>>>>>>>>> however).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Looks like the Flink runner may not yet support arbitrary code
>>>>>>>>>>>>>> written
>>>>>>>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink
>>>>>>>>>>>>>> runner
>>>>>>>>>>>>>> expects the sources to get translated away.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Max?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Dan
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> Thanks Raghu,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> When I try to run it on flink using the incubator-beam code,
>>>>>>>>>>>>>> i.e.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> flink run -c
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>>>>>>>>>>> --topics=test_in --outputTopic=test_out
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I get this:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>>>>>>>>>>> The main
>>>>>>>>>>>>>> method caused an error.
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator
>>>>>>>>>>>>>> registered
>>>>>>>>>>>>>> for Read(UnboundedKafkaSource)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>>>>>>>>> Method)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>>>>>>>    ... 6 more
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Any ideas?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for trying it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I fixed the CheckStyle error  (not sure why my build is not
>>>>>>>>>>>>>> failing).
>>>>>>>>>>>>>> Let me know if you see any issues running with Beam. I haven't
>>>>>>>>>>>>>> tried it. I
>>>>>>>>>>>>>> should. In fact Daniel Halperin says my patch should be
>>>>>>>>>>>>>> against Beam..
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Raghu.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and
>>>>>>>>>>>>>> Raghu for
>>>>>>>>>>>>>> pointing me to working code.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of
>>>>>>>>>>>>>> your
>>>>>>>>>>>>>> responses has been very welcome.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> In the first instance, I’ll try using your changes, Raghu.
>>>>>>>>>>>>>> I’ve
>>>>>>>>>>>>>> cloned your repo, switched to the kafka branch and built both
>>>>>>>>>>>>>> contrib/kafka
>>>>>>>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed
>>>>>>>>>>>>>> with a
>>>>>>>>>>>>>> CheckStyle error
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>>>>>>>>>>> 'private' modifier out of order with the JLS suggestions)…
>>>>>>>>>>>>>> I’ve fixed that
>>>>>>>>>>>>>> in my local clone and now it’s building fine. I hope to be
>>>>>>>>>>>>>> able to run your
>>>>>>>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which
>>>>>>>>>>>>>> will be what
>>>>>>>>>>>>>> I attempt to do now.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks again to all, for your swift help.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi
>>>>>>>>>>>>>> <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It
>>>>>>>>>>>>>> will be
>>>>>>>>>>>>>> merged soon. The example there keeps track of top hashtags in
>>>>>>>>>>>>>> 10 minute
>>>>>>>>>>>>>> sliding window and writes the results to another Kafka topic.
>>>>>>>>>>>>>> Please try it
>>>>>>>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have
>>>>>>>>>>>>>> not run it
>>>>>>>>>>>>>> using Flink runner.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Raghu.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> Hello Bill,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> This is a known limitation of the Flink Runner.
>>>>>>>>>>>>>> There is a JIRA issue for that
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>>>>>>>>>>> a more Beam-y solution will come as well.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I’m trying to write a proof-of-concept which takes messages
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the
>>>>>>>>>>>>>> results onto a
>>>>>>>>>>>>>> different Kafka topic.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting
>>>>>>>>>>>>>> point,
>>>>>>>>>>>>>> and that’s doing the first part of what I want to do, but it
>>>>>>>>>>>>>> outputs to text
>>>>>>>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks
>>>>>>>>>>>>>> promising, but I can’t
>>>>>>>>>>>>>> figure out how to plug it into the pipeline. I was thinking
>>>>>>>>>>>>>> that it would be
>>>>>>>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that
>>>>>>>>>>>>>> doesn’t seem to
>>>>>>>>>>>>>> exist.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Any advice or thoughts on what I’m trying to do?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I’m running the latest incubator-beam (as of last night from
>>>>>>>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on
>>>>>>>>>>>>>> Google
>>>>>>>>>>>>>> Compute Engine (Debian Jessie).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Bill McCarthy
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>> [email protected]
>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>> Talend - http://www.talend.com
> 

Reply via email to