Hi, you could, as JB suggested, replace it with something like this: PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
and then give "--runner=FlinkPipelineRunner --streaming=true" to make the runner configurable via command-line parameters. I think the problem with Kafka in the batch runner can be solved by putting the Kafka jars into the lib folder of the Flink cluster nodes and restarting the cluster. -- Aljoscha On Sat, 28 May 2016 at 01:08 Kaniska Gmail <[email protected]> wrote: > Did you try FlinkKafkaConsumer and FlinkKafkaProducer ? > > Flink+Kafka+Beam working very nicely for streaming millions of events ... > for me. > > Also make sure you are packaging latest beam source code. > > Sent from my iPhone > > On May 27, 2016, at 2:48 PM, amir bahmanyari <[email protected]> wrote: > > Hi JB > I replaced, in my code : > FlinkPipelineOptions options = > PipelineOptionsFactory.as(FlinkPipelineOptions.class); > with: > PipelineOptions options = PipelineOptionsFactory.create(); > > The compiler complained about setStreaming(true) not being supported. So I > commented out options.setStreaming(true); > Compiled fine. > Then run it. > It threw at p.run(): > > ...Running thread threw: java.lang.UnsupportedOperationException: The > transform Read(*UnboundedKafkaSource) is currently not supported.* > at > org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitPrimitiveTransform(FlinkBatchPipelineTranslator.java:100) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292) > at > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34) > at > org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.translate(FlinkBatchPipelineTranslator.java:56) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > at > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220) > > > So I added *.withMaxNumRecords(100)* to the KafkaIO call. > Restarted Flink Cluster. Rerun the app. > Got this exception at p.run() now. Have a wonderful weekend. > > > ...Running thread threw: java.lang.RuntimeException: Pipeline execution > failed > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:117) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > at > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Failed to submit job > 6d197ab3c635428c287be6a2dd8f6d6e (readfromkafka2-abahman-0527214049) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:315) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:114) > ... 14 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed > to submit job 6d197ab3c635428c287be6a2dd8f6d6e > (readfromkafka2-abahman-0527214049) > at org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits caused an error: Could not create input splits from Source. > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696) > at org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023) > ... 23 more > Caused by: java.io.IOException: Could not create input splits from Source. > at > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:113) > at > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:44) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156) > ... 25 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArrayDeserializer for > configuration value.deserializer: Class > org.apache.kafka.common.serialization.ByteArrayDeserializer could not be > found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255) > at > org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145) > at > org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49) > at > org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56) > at > org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494) > at > org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339) > at > org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337) > at > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572) > at > org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:168) > at > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:105) > ... 27 more > > > > ------------------------------ > *From:* Jean-Baptiste Onofré <[email protected]> > *To:* [email protected] > *Sent:* Friday, May 27, 2016 1:33 AM > *Subject:* Re: Beam Flink vs Beam Spark Benchmarking > > Hi Ismaël, > > as discussed together, clearly, the pipeline code should not use a > runner specific pipeline options object, in order to be runner agnostic. > > Something like: > > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class) > > should not be used. > It's better to use something like: > > PipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation().create(); > > > However, I think we may improve a bit the factory. > > Regards > JB > > On 05/27/2016 10:22 AM, Ismaël Mejía wrote: > > > > I passed last week running tests on multiple runners, and theoretically > > you should not change many things, however you must take care of not > > mixing runner specific dependencies while you create your project (e.g. > > you don't want to mix specific classes like FlinkPipelineOptions or > > SparkPipelineOptions in your code). > > > > About specific good practices of how to benchmark things this is a more > > tricky subject, e.g. you must be sure that both runners are using at > > least similar parallelism levels. Of course there are many dimensions in > > benchmarking and in particular in this space, the real question you have > > to start with is what do you want to benchmark (throughput, resource > > utilisation, etc) ? Is your pipeline batch only or streaming too ?. And > > then try to create an scenario that you can reproduce where you expect a > > similar behaviour among runners. > > > > But one thing is clear, you have to expect some differences since the > > internal model of each runner is different as well as their maturity > > level (at least at this point). > > > > Ismaël > > > > > > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Colleagues, > > I have implemented the Java version of the MIT's Linear Road > > algorithm as a Beam app. > > I sanity tested it in a Flink Cluster (FlinkRunner). Works fine. > > Receives tuples from Kafka, executes the LR algorithm, and produces > > the correct results. > > I would like to repeat the same in a Spark cluster. > > I am assuming that, other than changing the type of the Runner > > (Flink vs Spark) at runtime, I should not make any code changes. > > Is that the right assumption based on what Beam is promising > > regarding unifying of the underlying streaming engines? > > > > The real question is: What should I take into consideration if I > > want to Benchmark Flink vs Spark by executing my same Beam LR app in > > both engines? > > How would you approach the benchmarking process? What would you be > > looking for to compare? etc. > > Thanks so much for your valuable time. > > Amir- > > > > > > > > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <[email protected] > > <mailto:[email protected]>> wrote: > > > > > Hi Colleagues, > > I have implemented the Java version of the MIT's Linear Road > > algorithm as a Beam app. > > I sanity tested it in a Flink Cluster (FlinkRunner). Works fine. > > Receives tuples from Kafka, executes the LR algorithm, and produces > > the correct results. > > I would like to repeat the same in a Spark cluster. > > I am assuming that, other than changing the type of the Runner > > (Flink vs Spark) at runtime, I should not make any code changes. > > Is that the right assumption based on what Beam is promising > > regarding unifying of the underlying streaming engines? > > > > The real question is: What should I take into consideration if I > > want to Benchmark Flink vs Spark by executing my same Beam LR app in > > both engines? > > How would you approach the benchmarking process? What would you be > > looking for to compare? etc. > > Thanks so much for your valuable time. > > Amir- > > > > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > > > >
