Thanks Dan and Aljoscha, I noticed the DirectPipelineRunner in the stack trace, and thought it was odd. I had a look in the code to see how I could change that, but came up empty: thanks for the README link.
…however, then Dan’s mail came in, which mentioned “sources”, and I changed my example to just use the KafkaWriter, without messing with the FlinkKafkaConsumer08 that I had previously been using. This proved fruitful, and now I have my example running with the FlinkKafkaConsumer08 as a source, and the new KafkaWriter as a sink. So, my example is now working for what I need. Thanks to you all for your help. In the interests of closing the loop, I’ll switch the runner from the HashTag example, per the README, and see what I get. Bill > On Mar 18, 2016, at 4:21 PM, Aljoscha Krettek <[email protected]> wrote: > > Hi, > looks like the example is being executed with the DirectPipelineRunner which > does not seem to be able to cope with UnboundedSource. You need to set the > runner to the FlinkRunner in the example code as described here: > https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example > > The Flink runner should be able to deal with UnboundedSource but has the > limitation that sources are always parallelism=1 (this is being worked on, > however). > > Cheers, > Aljoscha >> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote: >> >> Looks like the Flink runner may not yet support arbitrary code written with >> the UnboundedSource API. That is, it looks like the Flink runner expects the >> sources to get translated away. >> >> Max? >> >> Dan >> >> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >> <[email protected]> wrote: >> Thanks Raghu, >> >> When I try to run it on flink using the incubator-beam code, i.e. >> >> flink run -c >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >> --topics=test_in --outputTopic=test_out >> >> I get this: >> >> org.apache.flink.client.program.ProgramInvocationException: The main method >> caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> Caused by: java.lang.IllegalStateException: no evaluator registered for >> Read(UnboundedKafkaSource) >> at >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >> at >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >> at >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >> at >> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >> at >> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >> at >> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >> at >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >> at >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >> at >> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >> at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >> at >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> ... 6 more >> >> Any ideas? >> >> Bill >> >>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote: >>> >>> Thanks for trying it. >>> >>> I fixed the CheckStyle error (not sure why my build is not failing). Let >>> me know if you see any issues running with Beam. I haven't tried it. I >>> should. In fact Daniel Halperin says my patch should be against Beam.. >>> >>> Raghu. >>> >>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >>> <[email protected]> wrote: >>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for >>> pointing me to working code. >>> >>> I’m in the middle of a hack day at the moment, so the speed of your >>> responses has been very welcome. >>> >>> In the first instance, I’ll try using your changes, Raghu. I’ve cloned your >>> repo, switched to the kafka branch and built both contrib/kafka and >>> contrib/examples/kafka. The contrib/kafka initially failed with a >>> CheckStyle error >>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that >>> in my local clone and now it’s building fine. I hope to be able to run your >>> contrib unchanged on top of the incubator-beam codebase, which will be what >>> I attempt to do now. >>> >>> Thanks again to all, for your swift help. >>> >>> Bill >>> >>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> wrote: >>>> >>>> Hi Bill, >>>> >>>> We have fairly well tested patch for KafkaIO (pr #121). It will be merged >>>> soon. The example there keeps track of top hashtags in 10 minute sliding >>>> window and writes the results to another Kafka topic. Please try it if you >>>> can. It is well tested on Google Cloud Dataflow. I have not run it using >>>> Flink runner. >>>> >>>> Raghu. >>>> >>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >>>> <[email protected]> wrote: >>>> Hello Bill, >>>> >>>> This is a known limitation of the Flink Runner. >>>> There is a JIRA issue for that >>>> https://issues.apache.org/jira/browse/BEAM-127 >>>> >>>> A wrapper for Flink sinks will come soon and as Beam evolves, >>>> a more Beam-y solution will come as well. >>>> >>>> Kostas >>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy <[email protected]> >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I’m trying to write a proof-of-concept which takes messages from Kafka, >>>>> transforms them using Beam on Flink, then pushes the results onto a >>>>> different Kafka topic. >>>>> >>>>> I’ve used the KafkaWindowedWordCountExample as a starting point, and >>>>> that’s doing the first part of what I want to do, but it outputs to text >>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I >>>>> can’t figure out how to plug it into the pipeline. I was thinking that it >>>>> would be wrapped with an UnboundedFlinkSink, or some such, but that >>>>> doesn’t seem to exist. >>>>> >>>>> Any advice or thoughts on what I’m trying to do? >>>>> >>>>> I’m running the latest incubator-beam (as of last night from Github), >>>>> Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute >>>>> Engine (Debian Jessie). >>>>> >>>>> Thanks, >>>>> >>>>> Bill McCarthy >>>> >>>> >>> >>> >> >> >
