Hi William, I started working on a fix and better testing of the UnboundedSourceWrapper. I'll get back to you shortly with a pull request that we should be able to merge soon.
- Max On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected]> wrote: > Hi, > as far as I can see from a quick glance the problem is that > UnboundedSourceWrapper stores an instance of Reader that it gets from the > Source. The Reader is not Serializable while the UnboundedSource is. I think > the Reader should be initialized when actually running the source, in the > run() method. > > Cheers, > Aljoscha >> On 23 Mar 2016, at 13:07, William McCarthy <[email protected]> wrote: >> >> The first stack trace was from the latest runner, yes. I’ve pulled the very >> latest, just now, and still get the same thing. >> >> When I write ‘pulled the very latest’, here’s what I mean (all of the >> following commands, except the latest, finished with success): >> >> $ cd incubator-beam >> $ git pull >> …some output, then success >> $ git branch >> * master >> $ mvn -DskipTests clean install >> …some output, then success >> $ cd <my project> >> $ mvn -DskipTests clean install >> …some output, then success >> $ <command as output below> >> >> Bill >> >>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> wrote: >>> >>> Hi William, >>> >>> Is the first stack trace from the latest master? Using only the simple >>> class name of the Runner should actually work now. I've also added a >>> test to explicitly test that. >>> >>> The latter error, as Aljoscha pointed out, this due to batch >>> execution. Perhaps we could make it explicit during startup which mode >>> we're executing in. File a JIRA issue for that: >>> https://issues.apache.org/jira/browse/BEAM-139 >>> >>> - Max >>> >>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <[email protected]> >>> wrote: >>>> Hi, >>>> for Flink the runner can internally either translate to a batch job or a >>>> streaming job. Unbounded sources are not supported when running in batch >>>> mode so you have to somehow specify that you want to have streaming mode. >>>> StreamingOptions has method setStreaming, maybe you can specify >>>> “—streaming true” on the command line to set that flag. >>>> >>>> Cheers, >>>> Aljoscha >>>>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> >>>>> wrote: >>>>> >>>>> I’ve attempted to run the TopHashtagsExample again, using both >>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, >>>>> though the latter gets further. I hope this helps, here is the output of >>>>> both: >>>>> >>>>> $ flink run -c >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>> >>>>> ------------------------------------------------------------ >>>>> The program finished with the following exception: >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error. >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> at >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> at >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >>>>> 'FlinkPipelineRunner', supported pipeline runners >>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>> DirectPipelineRunner] >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) >>>>> at >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> ... 6 more >>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> at java.lang.Class.forName0(Native Method) >>>>> at java.lang.Class.forName(Class.java:264) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) >>>>> ... 14 more >>>>> >>>>> >>>>> $ flink run -c >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>> target/beam-1.0-SNAPSHOT.jar >>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>> >>>>> ------------------------------------------------------------ >>>>> The program finished with the following exception: >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error. >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> at >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> at >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> Caused by: java.lang.UnsupportedOperationException: The transform >>>>> Read(UnboundedKafkaSource) is currently not supported. >>>>> at >>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111) >>>>> at >>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>> at >>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>> at >>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>> at >>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>> at >>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>> at >>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34) >>>>> at >>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130) >>>>> at >>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109) >>>>> at >>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50) >>>>> at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>> at >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> ... 6 more >>>>> >>>>> >>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote: >>>>>> >>>>>> Thanks Max. >>>>>> >>>>>> Bill McCarthy, >>>>>> I know you are unblocked and KafkaWriter is good enough. Please try >>>>>> KafkaIO source from my branch with Flink runner if you get a chance. >>>>>> >>>>>> thanks, >>>>>> Raghu. >>>>>> >>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré >>>>>> <[email protected]> wrote: >>>>>> Thanks for the update Max ! >>>>>> >>>>>> Regards >>>>>> JB >>>>>> >>>>>> >>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote: >>>>>> FYI: The Runner registration has been fixed. The Flink runner >>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the >>>>>> PipelineRunner class in case it has not been registered [2]. >>>>>> >>>>>> [1] https://github.com/apache/incubator-beam/pull/40 >>>>>> [2] https://github.com/apache/incubator-beam/pull/61 >>>>>> >>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> >>>>>> wrote: >>>>>> Great to see such a lively discussion here. >>>>>> >>>>>> I think we'll support sinks through the Write interface (like in >>>>>> batched execution) and also have a dedicated wrapper for the Flink >>>>>> sinks. This is a very pressing but easy to solve issue of the Flink >>>>>> runner. Expect it to be in next week. >>>>>> >>>>>> Also, the proper registration of the runner is about to to be merged. >>>>>> We just need an ok from the contributor to merge the changes. >>>>>> >>>>>> Best, >>>>>> Max >>>>>> >>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> >>>>>> wrote: >>>>>> Thanks Bill! >>>>>> >>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's >>>>>> not >>>>>> blocking you! >>>>>> >>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy >>>>>> <[email protected]> wrote: >>>>>> >>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker for >>>>>> me, given that I have my example running, but for your information the >>>>>> exception is below. >>>>>> >>>>>> I’ll watch the commit log on the beam incubator and look forward to >>>>>> deleting my copy of Raghu’s contributions when they’re merger to master. >>>>>> >>>>>> Thanks again for everyone’s help, >>>>>> >>>>>> Bill >>>>>> >>>>>> >>>>>> Command followed by exception: >>>>>> >>>>>> $ flink run -c >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>> target/beam-1.0-SNAPSHOT.jar >>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>>> >>>>>> ------------------------------------------------------------ >>>>>> The program finished with the following exception: >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>> method caused an error. >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline >>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>> DirectPipelineRunner] >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>>> at >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>> ... 6 more >>>>>> >>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote: >>>>>> >>>>>> I don't believe the FlinkPipelineRunner is registered the same way the >>>>>> Dataflow & Direct Pipeline runners are registered; using >>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work >>>>>> >>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy >>>>>> <[email protected]> wrote: >>>>>> >>>>>> Thanks Dan, >>>>>> >>>>>> I tried that, but getting the below. Note that the jar contains the >>>>>> FlinkPipelineRunner. >>>>>> >>>>>> >>>>>> >>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class >>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class >>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >>>>>> >>>>>> % flink run -c >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>>> >>>>>> ------------------------------------------------------------ >>>>>> The program finished with the following exception: >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>> method caused an error. >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >>>>>> 'FlinkPipelineRunner', supported pipeline runners >>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>> DirectPipelineRunner] >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>>> at >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>> ... 6 more >>>>>> >>>>>> >>>>>> >>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote: >>>>>> >>>>>> Thanks for catching that, Aljoscha! >>>>>> >>>>>> Note that the Flink runner should be available via a command-line option >>>>>> as well: --runner=FlinkPipelineRunner. >>>>>> >>>>>> The list of valid values for that flag is computed by walking the >>>>>> classpath at runtime, so as long as the Flink jar is present it'll work. >>>>>> >>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Hi, >>>>>> looks like the example is being executed with the DirectPipelineRunner >>>>>> which does not seem to be able to cope with UnboundedSource. You need to >>>>>> set >>>>>> the runner to the FlinkRunner in the example code as described here: >>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >>>>>> >>>>>> The Flink runner should be able to deal with UnboundedSource but has the >>>>>> limitation that sources are always parallelism=1 (this is being worked >>>>>> on, >>>>>> however). >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote: >>>>>> >>>>>> Looks like the Flink runner may not yet support arbitrary code written >>>>>> with the UnboundedSource API. That is, it looks like the Flink runner >>>>>> expects the sources to get translated away. >>>>>> >>>>>> Max? >>>>>> >>>>>> Dan >>>>>> >>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >>>>>> <[email protected]> wrote: >>>>>> Thanks Raghu, >>>>>> >>>>>> When I try to run it on flink using the incubator-beam code, i.e. >>>>>> >>>>>> flink run -c >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >>>>>> --topics=test_in --outputTopic=test_out >>>>>> >>>>>> I get this: >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>> method caused an error. >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>> at >>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered >>>>>> for Read(UnboundedKafkaSource) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >>>>>> at >>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >>>>>> at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>>> at >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>> ... 6 more >>>>>> >>>>>> Any ideas? >>>>>> >>>>>> Bill >>>>>> >>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote: >>>>>> >>>>>> Thanks for trying it. >>>>>> >>>>>> I fixed the CheckStyle error (not sure why my build is not failing). >>>>>> Let me know if you see any issues running with Beam. I haven't tried it. >>>>>> I >>>>>> should. In fact Daniel Halperin says my patch should be against Beam.. >>>>>> >>>>>> Raghu. >>>>>> >>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >>>>>> <[email protected]> wrote: >>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for >>>>>> pointing me to working code. >>>>>> >>>>>> I’m in the middle of a hack day at the moment, so the speed of your >>>>>> responses has been very welcome. >>>>>> >>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve >>>>>> cloned your repo, switched to the kafka branch and built both >>>>>> contrib/kafka >>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with a >>>>>> CheckStyle error >>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed >>>>>> that >>>>>> in my local clone and now it’s building fine. I hope to be able to run >>>>>> your >>>>>> contrib unchanged on top of the incubator-beam codebase, which will be >>>>>> what >>>>>> I attempt to do now. >>>>>> >>>>>> Thanks again to all, for your swift help. >>>>>> >>>>>> Bill >>>>>> >>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Hi Bill, >>>>>> >>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be >>>>>> merged soon. The example there keeps track of top hashtags in 10 minute >>>>>> sliding window and writes the results to another Kafka topic. Please try >>>>>> it >>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not run it >>>>>> using Flink runner. >>>>>> >>>>>> Raghu. >>>>>> >>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >>>>>> <[email protected]> wrote: >>>>>> Hello Bill, >>>>>> >>>>>> This is a known limitation of the Flink Runner. >>>>>> There is a JIRA issue for that >>>>>> https://issues.apache.org/jira/browse/BEAM-127 >>>>>> >>>>>> A wrapper for Flink sinks will come soon and as Beam evolves, >>>>>> a more Beam-y solution will come as well. >>>>>> >>>>>> Kostas >>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy >>>>>> <[email protected]> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I’m trying to write a proof-of-concept which takes messages from >>>>>> Kafka, transforms them using Beam on Flink, then pushes the results onto >>>>>> a >>>>>> different Kafka topic. >>>>>> >>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point, >>>>>> and that’s doing the first part of what I want to do, but it outputs to >>>>>> text >>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I >>>>>> can’t >>>>>> figure out how to plug it into the pipeline. I was thinking that it >>>>>> would be >>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem >>>>>> to >>>>>> exist. >>>>>> >>>>>> Any advice or thoughts on what I’m trying to do? >>>>>> >>>>>> I’m running the latest incubator-beam (as of last night from >>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google >>>>>> Compute Engine (Debian Jessie). >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Bill McCarthy >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jean-Baptiste Onofré >>>>>> [email protected] >>>>>> http://blog.nanthrax.net >>>>>> Talend - http://www.talend.com >>>>>> >>>>> >>>> >> >
