On Tue, Mar 28, 2017 at 11:12 AM, Jan Holmberg <jan.holmb...@perigeum.fi> wrote:
> Here's bit more details. This is the piece of logic is consuming 4 secs > according Spark log. Few anonymous functions - transformations - are > trivial. > -jan > > org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1444) > this stack is just the "client" side of sending out the 'foreachPartition' work. Are you sure it's not the scheduling of the tasks that's taking a long time? What if you replace the Kudu writeRows call with something like: myDataFrame.foreachPartition(rows => { val i := 0 for (row <- rows) { i++; } i }) so that it's not writing to Kudu anymore but still consuming the same rows? > org.apache.kudu.spark.kudu.KuduContext.writeRows(KuduContext.scala:180) > org.apache.kudu.spark.kudu.KuduContext.upsertRows(KuduContext.scala:154) > com.kafka.consumer.SCC$.writeKudu(KafkaKuduConsumer.scala:227) > com.kafka.consumer.SCC$$anonfun$getSSContext$2.apply( > KafkaKuduConsumer.scala:294) > com.kafka.consumer.SCC$$anonfun$getSSContext$2.apply( > KafkaKuduConsumer.scala:275) > org.apache.spark.streaming.dstream.DStream$$anonfun$ > foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > org.apache.spark.streaming.dstream.DStream$$anonfun$ > foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > org.apache.spark.streaming.dstream.ForEachDStream$$ > anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) > org.apache.spark.streaming.dstream.ForEachDStream$$ > anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > org.apache.spark.streaming.dstream.ForEachDStream$$ > anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties( > DStream.scala:426) > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp( > ForEachDStream.scala:49) > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply( > ForEachDStream.scala:49) > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply( > ForEachDStream.scala:49) > scala.util.Try$.apply(Try.scala:161) > org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > org.apache.spark.streaming.scheduler.JobScheduler$ > JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) > org.apache.spark.streaming.scheduler.JobScheduler$ > JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > org.apache.spark.streaming.scheduler.JobScheduler$ > JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > > > On 28 Mar 2017, at 20.38, Jan Holmberg <jan.holmb...@perigeum.fi> wrote: > > Thanks for swift response! My measurements are very informal :-) ie. I'm > looking at Spark's 'Streaming' log page that informs "processing time" > directly. That value is systematically 4 sec or 0,5 sec between different > program runs for all batches. > > Prog is pretty simple so I do not expect that prog itself is causing the > delay. I'm suspecting two causes: > > 1) Kudu connection. I have not found a recent streaming example so I might > have some misunderstanding in my code. At the moment, I'm creating new > KuduContext for each batch (val kuduContext = new KuduContext("m1:7051")). > Pointer to good streaming example would definitively help. > > 2) Kafka connection. We've seen earlier that Spark executor locations have > impact on performance. > > -jan > > > > On 28 Mar 2017, at 19.16, Todd Lipcon <t...@cloudera.com> wrote: > > On Tue, Mar 28, 2017 at 8:24 AM, Jan Holmberg <jan.holmb...@perigeum.fi> > wrote: > >> I'm wondering the reason, why simple Spark prog. reading streaming data >> from Kafka and writing result to Kudu, has unpredictable write times. In >> most cases, when running the prog, write times are systematically 4 sec >> regardless of the number of messages (anything from 50 to 2000 messages per >> batch). But occasionally when starting the prog, it runs substantially >> faster where write times are below 0,5 sec with exactly same code base, >> settings etc. >> > > How are you measuring "write times" here? Are you sure the time is being > spent in the Kudu code and not in other parts of the streaming app? > > Writing 2000 rows to Kudu should be on the order of a few milliseconds -- > even 0.5 seconds sounds extremely high. > > Are you by chance instantiating a new KuduClient each time you write a > batch, rather than reusing an existing one? > > >> >> Our environment is plain AWS cluster with 3 slaves where each slave has >> Kafka and Kudu tablet server instance with CDH 5.10 & Kudu 1.2 & Spark 1.6. >> >> Any hints what to look at? >> >> cheers, >> -jan > > > > > -- > Todd Lipcon > Software Engineer, Cloudera > > > > -- Todd Lipcon Software Engineer, Cloudera