Hi William,

I started working on a fix and better testing of the
UnboundedSourceWrapper. I'll get back to you shortly with a pull
request that we should be able to merge soon.

- Max

On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected]> wrote:
> Hi,
> as far as I can see from a quick glance the problem is that 
> UnboundedSourceWrapper stores an instance of Reader that it gets from the 
> Source. The Reader is not Serializable while the UnboundedSource is. I think 
> the Reader should be initialized when actually running the source, in the 
> run() method.
>
> Cheers,
> Aljoscha
>> On 23 Mar 2016, at 13:07, William McCarthy <[email protected]> wrote:
>>
>> The first stack trace was from the latest runner, yes. I’ve pulled the very 
>> latest, just now, and still get the same thing.
>>
>> When I write ‘pulled the very latest’, here’s what I mean (all of the 
>> following commands, except the latest, finished with success):
>>
>> $ cd incubator-beam
>> $ git pull
>> …some output, then success
>> $ git branch
>> * master
>> $ mvn -DskipTests clean install
>> …some output, then success
>> $ cd <my project>
>> $ mvn -DskipTests clean install
>> …some output, then success
>> $ <command as output below>
>>
>> Bill
>>
>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> wrote:
>>>
>>> Hi William,
>>>
>>> Is the first stack trace from the latest master? Using only the simple
>>> class name of the Runner should actually work now. I've also added a
>>> test to explicitly test that.
>>>
>>> The latter error, as Aljoscha pointed out, this due to batch
>>> execution. Perhaps we could make it explicit during startup which mode
>>> we're executing in. File a JIRA issue for that:
>>> https://issues.apache.org/jira/browse/BEAM-139
>>>
>>> - Max
>>>
>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <[email protected]> 
>>> wrote:
>>>> Hi,
>>>> for Flink the runner can internally either translate to a batch job or a 
>>>> streaming job. Unbounded sources are not supported when running in batch 
>>>> mode so you have to somehow specify that you want to have streaming mode. 
>>>> StreamingOptions has method setStreaming, maybe you can specify 
>>>> “—streaming true” on the command line to set that flag.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> 
>>>>> wrote:
>>>>>
>>>>> I’ve attempted to run the TopHashtagsExample again, using both 
>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, 
>>>>> though the latter gets further. I hope this helps, here is the output of 
>>>>> both:
>>>>>
>>>>> $ flink run -c 
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>
>>>>> ------------------------------------------------------------
>>>>> The program finished with the following exception:
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>> method caused an error.
>>>>>     at 
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>     at 
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>     at 
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>     at 
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified 
>>>>> 'FlinkPipelineRunner', supported pipeline runners 
>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>>>>> DirectPipelineRunner]
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>>>>     at 
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at 
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>     at 
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at 
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>     ... 6 more
>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>     at java.lang.Class.forName(Class.java:264)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>>>>     ... 14 more
>>>>>
>>>>>
>>>>> $ flink run -c 
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>> target/beam-1.0-SNAPSHOT.jar 
>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>
>>>>> ------------------------------------------------------------
>>>>> The program finished with the following exception:
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>> method caused an error.
>>>>>     at 
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>     at 
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>     at 
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>     at 
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.UnsupportedOperationException: The transform 
>>>>> Read(UnboundedKafkaSource) is currently not supported.
>>>>>     at 
>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>     at 
>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>     at 
>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>>>>>     at 
>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>>>>>     at 
>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>>>>>     at 
>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>>>>>     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>     at 
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at 
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>     at 
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at 
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>     ... 6 more
>>>>>
>>>>>
>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>
>>>>>> Thanks Max.
>>>>>>
>>>>>> Bill McCarthy,
>>>>>> I know you are unblocked and KafkaWriter is good enough. Please try 
>>>>>> KafkaIO source from my branch with Flink runner if you get a chance.
>>>>>>
>>>>>> thanks,
>>>>>> Raghu.
>>>>>>
>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré 
>>>>>> <[email protected]> wrote:
>>>>>> Thanks for the update Max !
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>>
>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>>>>>> FYI: The Runner registration has been fixed. The Flink runner
>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the
>>>>>> PipelineRunner class in case it has not been registered [2].
>>>>>>
>>>>>> [1] https://github.com/apache/incubator-beam/pull/40
>>>>>> [2] https://github.com/apache/incubator-beam/pull/61
>>>>>>
>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> 
>>>>>> wrote:
>>>>>> Great to see such a lively discussion here.
>>>>>>
>>>>>> I think we'll support sinks through the Write interface (like in
>>>>>> batched execution) and also have a dedicated wrapper for the Flink
>>>>>> sinks. This is a very pressing but easy to solve issue of the Flink
>>>>>> runner. Expect it to be in next week.
>>>>>>
>>>>>> Also, the proper registration of the runner is about to to be merged.
>>>>>> We just need an ok from the contributor to merge the changes.
>>>>>>
>>>>>> Best,
>>>>>> Max
>>>>>>
>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> 
>>>>>> wrote:
>>>>>> Thanks Bill!
>>>>>>
>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's 
>>>>>> not
>>>>>> blocking you!
>>>>>>
>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker for
>>>>>> me, given that I have my example running, but for your information the
>>>>>> exception is below.
>>>>>>
>>>>>> I’ll watch the commit log on the beam incubator and look forward to
>>>>>> deleting my copy of Raghu’s contributions when they’re merger to master.
>>>>>>
>>>>>> Thanks again for everyone’s help,
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>>
>>>>>> Command followed by exception:
>>>>>>
>>>>>> $ flink run -c
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> The program finished with the following exception:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>> method caused an error.
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>> DirectPipelineRunner]
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>> at
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>> ... 6 more
>>>>>>
>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>>>>>>
>>>>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>>>>> Dataflow & Direct Pipeline runners are registered; using
>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>>>
>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Thanks Dan,
>>>>>>
>>>>>> I tried that, but getting the below. Note that the jar contains the
>>>>>> FlinkPipelineRunner.
>>>>>>
>>>>>>
>>>>>>
>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>>>
>>>>>> % flink run -c
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> The program finished with the following exception:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>> method caused an error.
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>> DirectPipelineRunner]
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>> at
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>> at
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>> ... 6 more
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
>>>>>>
>>>>>> Thanks for catching that, Aljoscha!
>>>>>>
>>>>>> Note that the Flink runner should be available via a command-line option
>>>>>> as well: --runner=FlinkPipelineRunner.
>>>>>>
>>>>>> The list of valid values for that flag is computed by walking the
>>>>>> classpath at runtime, so as long as the Flink jar is present it'll work.
>>>>>>
>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> looks like the example is being executed with the DirectPipelineRunner
>>>>>> which does not seem to be able to cope with UnboundedSource. You need to 
>>>>>> set
>>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>>
>>>>>> The Flink runner should be able to deal with UnboundedSource but has the
>>>>>> limitation that sources are always parallelism=1 (this is being worked 
>>>>>> on,
>>>>>> however).
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>>>>>
>>>>>> Looks like the Flink runner may not yet support arbitrary code written
>>>>>> with the UnboundedSource API. That is, it looks like the Flink runner
>>>>>> expects the sources to get translated away.
>>>>>>
>>>>>> Max?
>>>>>>
>>>>>> Dan
>>>>>>
>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>>> <[email protected]> wrote:
>>>>>> Thanks Raghu,
>>>>>>
>>>>>> When I try to run it on flink using the incubator-beam code, i.e.
>>>>>>
>>>>>> flink run -c
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>>> --topics=test_in --outputTopic=test_out
>>>>>>
>>>>>> I get this:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>> method caused an error.
>>>>>>      at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>      at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>      at
>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>      at
>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>      at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>      at
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>      at
>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>>> for Read(UnboundedKafkaSource)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>>      at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>      at
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>      at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>      at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>      at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>      ... 6 more
>>>>>>
>>>>>> Any ideas?
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>
>>>>>> Thanks for trying it.
>>>>>>
>>>>>> I fixed the CheckStyle error  (not sure why my build is not failing).
>>>>>> Let me know if you see any issues running with Beam. I haven't tried it. 
>>>>>> I
>>>>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>>>>>
>>>>>> Raghu.
>>>>>>
>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>>> <[email protected]> wrote:
>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>>>>> pointing me to working code.
>>>>>>
>>>>>> I’m in the middle of a hack day at the moment, so the speed of your
>>>>>> responses has been very welcome.
>>>>>>
>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>>> cloned your repo, switched to the kafka branch and built both 
>>>>>> contrib/kafka
>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>>>>> CheckStyle error
>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed 
>>>>>> that
>>>>>> in my local clone and now it’s building fine. I hope to be able to run 
>>>>>> your
>>>>>> contrib unchanged on top of the incubator-beam codebase, which will be 
>>>>>> what
>>>>>> I attempt to do now.
>>>>>>
>>>>>> Thanks again to all, for your swift help.
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Bill,
>>>>>>
>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>>> merged soon. The example there keeps track of top hashtags in 10 minute
>>>>>> sliding window and writes the results to another Kafka topic. Please try 
>>>>>> it
>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not run it
>>>>>> using Flink runner.
>>>>>>
>>>>>> Raghu.
>>>>>>
>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>>> <[email protected]> wrote:
>>>>>> Hello Bill,
>>>>>>
>>>>>> This is a known limitation of the Flink Runner.
>>>>>> There is a JIRA issue for that
>>>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>>>>
>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>>> a more Beam-y solution will come as well.
>>>>>>
>>>>>> Kostas
>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I’m trying to write a proof-of-concept which takes messages from
>>>>>> Kafka, transforms them using Beam on Flink, then pushes the results onto 
>>>>>> a
>>>>>> different Kafka topic.
>>>>>>
>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>>> and that’s doing the first part of what I want to do, but it outputs to 
>>>>>> text
>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I 
>>>>>> can’t
>>>>>> figure out how to plug it into the pipeline. I was thinking that it 
>>>>>> would be
>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem 
>>>>>> to
>>>>>> exist.
>>>>>>
>>>>>> Any advice or thoughts on what I’m trying to do?
>>>>>>
>>>>>> I’m running the latest incubator-beam (as of last night from
>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>>>>> Compute Engine (Debian Jessie).
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Bill McCarthy
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jean-Baptiste Onofré
>>>>>> [email protected]
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>>
>>>>>
>>>>
>>
>

Reply via email to