Hi,
you could, as JB suggested, replace it with something like this:

PipelineOptions options
= PipelineOptionsFactory.fromArgs(args).withValidation().create();

and then give "--runner=FlinkPipelineRunner --streaming=true" to make the
runner configurable via command-line parameters.

I think the problem with Kafka in the batch runner can be solved by putting
the Kafka jars into the lib folder of the Flink cluster nodes and
restarting the cluster.

--
Aljoscha

On Sat, 28 May 2016 at 01:08 Kaniska Gmail <[email protected]> wrote:

> Did you try FlinkKafkaConsumer and FlinkKafkaProducer  ?
>
> Flink+Kafka+Beam working very nicely for streaming millions of events ...
> for me.
>
> Also make sure you are packaging latest beam source code.
>
> Sent from my iPhone
>
> On May 27, 2016, at 2:48 PM, amir bahmanyari <[email protected]> wrote:
>
> Hi JB
> I replaced, in my code :
> FlinkPipelineOptions options =
> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> with:
> PipelineOptions options = PipelineOptionsFactory.create();
>
> The compiler complained about setStreaming(true) not being supported. So I
> commented out options.setStreaming(true);
> Compiled fine.
> Then run it.
> It threw at p.run():
>
> ...Running thread  threw:  java.lang.UnsupportedOperationException: The
> transform Read(*UnboundedKafkaSource) is currently not supported.*
>         at
> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitPrimitiveTransform(FlinkBatchPipelineTranslator.java:100)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>         at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
>         at
> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>         at
> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.translate(FlinkBatchPipelineTranslator.java:56)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>         at
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)
>
>
> So I added *.withMaxNumRecords(100)* to the KafkaIO call.
> Restarted Flink Cluster. Rerun the app.
> Got this exception at p.run() now. Have a wonderful weekend.
>
>
> ...Running thread  threw:  java.lang.RuntimeException: Pipeline execution
> failed
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:117)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>         at
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Failed to submit job
> 6d197ab3c635428c287be6a2dd8f6d6e (readfromkafka2-abahman-0527214049)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:114)
>         ... 14 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job 6d197ab3c635428c287be6a2dd8f6d6e
> (readfromkafka2-abahman-0527214049)
>         at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Could not create input splits from Source.
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>         at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>         ... 23 more
> Caused by: java.io.IOException: Could not create input splits from Source.
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:113)
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:44)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>         ... 25 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
> org.apache.kafka.common.serialization.ByteArrayDeserializer for
> configuration value.deserializer: Class
> org.apache.kafka.common.serialization.ByteArrayDeserializer could not be
> found.
>         at
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)
>         at
> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)
>         at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)
>         at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
>         at
> org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>         at
> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:168)
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:105)
>         ... 27 more
>
>
>
> ------------------------------
> *From:* Jean-Baptiste Onofré <[email protected]>
> *To:* [email protected]
> *Sent:* Friday, May 27, 2016 1:33 AM
> *Subject:* Re: Beam Flink vs Beam Spark Benchmarking
>
> Hi Ismaël,
>
> as discussed together, clearly, the pipeline code should not use a
> runner specific pipeline options object, in order to be runner agnostic.
>
> Something like:
>
> SparkPipelineOptions options =
> PipelineOptionsFactory.as(SparkPipelineOptions.class)
>
> should not be used.
> It's better to use something like:
>
>   PipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>
>
> However, I think we may improve a bit the factory.
>
> Regards
> JB
>
> On 05/27/2016 10:22 AM, Ismaël Mejía wrote:
> > ​
> > I passed last week running tests on multiple runners, and theoretically
> > you should not change many things, however you must take care of not
> > mixing runner specific dependencies while you create your project (e.g.
> > you don't want to mix specific classes like FlinkPipelineOptions or
> > SparkPipelineOptions in your code).
> >
> > About specific good practices of how to benchmark things this is a more
> > tricky subject, e.g. you must be sure that both runners are using at
> > least similar parallelism levels. Of course there are many dimensions in
> > benchmarking and in particular in this space, the real question you have
> > to start with is what do you want to benchmark (throughput, resource
> > utilisation, etc) ? Is your pipeline batch only or streaming too ?. And
> > then try to create an scenario that you can reproduce where you expect a
> > similar behaviour among runners.
> >
> > But one thing is clear, you have to expect some differences since the
> > internal model of each runner is different as well as their maturity
> > level (at least at this point).
> >
> > Ismaël
> >
> >
> > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >    Hi Colleagues,
> >    I have implemented the Java version of the MIT's Linear Road
> >    algorithm as a Beam app.
> >    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> >    Receives tuples from Kafka, executes the LR algorithm, and produces
> >    the correct results.
> >    I would like to repeat the same in a Spark cluster.
> >    I am assuming that, other than changing the type of the Runner
> >    (Flink vs Spark) at runtime, I should not make any code changes.
> >    Is that the right assumption based on what Beam is promising
> >    regarding unifying of the underlying streaming engines?
> >
> >    The real question is: What should I take into consideration if I
> >    want to Benchmark Flink vs Spark by executing my same Beam LR app in
> >    both engines?
> >    How would you approach the benchmarking process? What would you be
> >    looking for to compare? etc.
> >    Thanks so much for your valuable time.
> >    Amir-
> >
> >
> >
> > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <[email protected]
> > <mailto:[email protected]>> wrote:
>
> >
> >    Hi Colleagues,
> >    I have implemented the Java version of the MIT's Linear Road
> >    algorithm as a Beam app.
> >    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> >    Receives tuples from Kafka, executes the LR algorithm, and produces
> >    the correct results.
> >    I would like to repeat the same in a Spark cluster.
> >    I am assuming that, other than changing the type of the Runner
> >    (Flink vs Spark) at runtime, I should not make any code changes.
> >    Is that the right assumption based on what Beam is promising
> >    regarding unifying of the underlying streaming engines?
> >
> >    The real question is: What should I take into consideration if I
> >    want to Benchmark Flink vs Spark by executing my same Beam LR app in
> >    both engines?
> >    How would you approach the benchmarking process? What would you be
> >    looking for to compare? etc.
> >    Thanks so much for your valuable time.
> >    Amir-
>
> >
> >
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>

Reply via email to