Thanks Bill!

Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's not
blocking you!

On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy <[email protected]
> wrote:

> I tried that, but still no dice: Just to be clear, it’s not a blocker for
> me, given that I have my example running, but for your information the
> exception is below.
>
> I’ll watch the commit log on the beam incubator and look forward to
> deleting my copy of Raghu’s contributions when they’re merger to master.
>
> Thanks again for everyone’s help,
>
> Bill
>
>
> Command followed by exception:
>
> $ flink run -c
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> target/beam-1.0-SNAPSHOT.jar 
> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported
> pipeline runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
> DirectPipelineRunner]
> at
> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
> at
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> ... 6 more
>
> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>
> I don't believe the FlinkPipelineRunner is registered the same way the
> Dataflow & Direct Pipeline runners are registered; using 
> org.apache.beam.runners.flink.FlinkPipelineRunner
> should work
>
> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy <
> [email protected]> wrote:
>
>> Thanks Dan,
>>
>> I tried that, but getting the below. Note that the jar contains the
>> FlinkPipelineRunner.
>>
>>
>>
>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>
>> % flink run -c
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>> 'FlinkPipelineRunner', supported pipeline
>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>> DirectPipelineRunner]
>> at
>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>> at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>> at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>> at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>> at
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> ... 6 more
>>
>>
>>
>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
>>
>> Thanks for catching that, Aljoscha!
>>
>> Note that the Flink runner should be available via a command-line option
>> as well: --runner=FlinkPipelineRunner.
>>
>> The list of valid values for that flag is computed by walking the
>> classpath at runtime, so as long as the Flink jar is present it'll work.
>>
>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]>
>> wrote:
>>
>>> Hi,
>>> looks like the example is being executed with the DirectPipelineRunner
>>> which does not seem to be able to cope with UnboundedSource. You need to
>>> set the runner to the FlinkRunner in the example code as described here:
>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>
>>> The Flink runner should be able to deal with UnboundedSource but has the
>>> limitation that sources are always parallelism=1 (this is being worked on,
>>> however).
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>> >
>>> > Looks like the Flink runner may not yet support arbitrary code written
>>> with the UnboundedSource API. That is, it looks like the Flink runner
>>> expects the sources to get translated away.
>>> >
>>> > Max?
>>> >
>>> > Dan
>>> >
>>> > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy <
>>> [email protected]> wrote:
>>> > Thanks Raghu,
>>> >
>>> > When I try to run it on flink using the incubator-beam code, i.e.
>>> >
>>> > flink run -c
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>> --topics=test_in --outputTopic=test_out
>>> >
>>> > I get this:
>>> >
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>> >       at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>> >       at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> >       at
>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> >       at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> >       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> >       at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>> >       at
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>> > Caused by: java.lang.IllegalStateException: no evaluator registered
>>> for Read(UnboundedKafkaSource)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>> >       at
>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>> >       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>> >       at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>> >       at
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> >       at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> >       at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> >       at java.lang.reflect.Method.invoke(Method.java:498)
>>> >       at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> >       ... 6 more
>>> >
>>> > Any ideas?
>>> >
>>> > Bill
>>> >
>>> >> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>> >>
>>> >> Thanks for trying it.
>>> >>
>>> >> I fixed the CheckStyle error  (not sure why my build is not failing).
>>> Let me know if you see any issues running with Beam. I haven't tried it. I
>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>> >>
>>> >> Raghu.
>>> >>
>>> >> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy <
>>> [email protected]> wrote:
>>> >> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>> pointing me to working code.
>>> >>
>>> >> I’m in the middle of a hack day at the moment, so the speed of your
>>> responses has been very welcome.
>>> >>
>>> >> In the first instance, I’ll try using your changes, Raghu. I’ve
>>> cloned your repo, switched to the kafka branch and built both contrib/kafka
>>> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>> CheckStyle error
>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
>>> in my local clone and now it’s building fine. I hope to be able to run your
>>> contrib unchanged on top of the incubator-beam codebase, which will be what
>>> I attempt to do now.
>>> >>
>>> >> Thanks again to all, for your swift help.
>>> >>
>>> >> Bill
>>> >>
>>> >>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>> wrote:
>>> >>>
>>> >>> Hi Bill,
>>> >>>
>>> >>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>> merged soon. The example there keeps track of top hashtags in 10 minute
>>> sliding window and writes the results to another Kafka topic. Please try it
>>> if you can. It is well tested on Google Cloud Dataflow. I have not run it
>>> using Flink runner.
>>> >>>
>>> >>> Raghu.
>>> >>>
>>> >>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas <
>>> [email protected]> wrote:
>>> >>> Hello Bill,
>>> >>>
>>> >>> This is a known limitation of the Flink Runner.
>>> >>> There is a JIRA issue for that
>>> https://issues.apache.org/jira/browse/BEAM-127
>>> >>>
>>> >>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>> >>> a more Beam-y solution will come as well.
>>> >>>
>>> >>> Kostas
>>> >>>> On Mar 18, 2016, at 5:23 PM, William McCarthy <
>>> [email protected]> wrote:
>>> >>>>
>>> >>>> Hi,
>>> >>>>
>>> >>>> I’m trying to write a proof-of-concept which takes messages from
>>> Kafka, transforms them using Beam on Flink, then pushes the results onto a
>>> different Kafka topic.
>>> >>>>
>>> >>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>> and that’s doing the first part of what I want to do, but it outputs to
>>> text files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I
>>> can’t figure out how to plug it into the pipeline. I was thinking that it
>>> would be wrapped with an UnboundedFlinkSink, or some such, but that doesn’t
>>> seem to exist.
>>> >>>>
>>> >>>> Any advice or thoughts on what I’m trying to do?
>>> >>>>
>>> >>>> I’m running the latest incubator-beam (as of last night from
>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>> Compute Engine (Debian Jessie).
>>> >>>>
>>> >>>> Thanks,
>>> >>>>
>>> >>>> Bill McCarthy
>>> >>>
>>> >>>
>>> >>
>>> >>
>>> >
>>> >
>>>
>>>
>>
>>
>
>

Reply via email to