On Tue, Mar 28, 2017 at 11:12 AM, Jan Holmberg <jan.holmb...@perigeum.fi>
wrote:

> Here's bit more details. This is the piece of logic is consuming 4 secs
> according Spark log. Few anonymous functions - transformations - are
> trivial.
> -jan
>
> org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1444)
>

this stack is just the "client" side of sending out the 'foreachPartition'
work.

Are you sure it's not the scheduling of the tasks that's taking a long time?

What if you replace the Kudu writeRows call with something like:

myDataFrame.foreachPartition(rows => {
  val i := 0
  for (row <- rows) {
    i++;
  }
  i
})


so that it's not writing to Kudu anymore but still consuming the same rows?


> org.apache.kudu.spark.kudu.KuduContext.writeRows(KuduContext.scala:180)
> org.apache.kudu.spark.kudu.KuduContext.upsertRows(KuduContext.scala:154)
> com.kafka.consumer.SCC$.writeKudu(KafkaKuduConsumer.scala:227)
> com.kafka.consumer.SCC$$anonfun$getSSContext$2.apply(
> KafkaKuduConsumer.scala:294)
> com.kafka.consumer.SCC$$anonfun$getSSContext$2.apply(
> KafkaKuduConsumer.scala:275)
> org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
> org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(
> DStream.scala:426)
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(
> ForEachDStream.scala:49)
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
> ForEachDStream.scala:49)
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
> ForEachDStream.scala:49)
> scala.util.Try$.apply(Try.scala:161)
> org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> org.apache.spark.streaming.scheduler.JobScheduler$
> JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
> org.apache.spark.streaming.scheduler.JobScheduler$
> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
> org.apache.spark.streaming.scheduler.JobScheduler$
> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
>
> On 28 Mar 2017, at 20.38, Jan Holmberg <jan.holmb...@perigeum.fi> wrote:
>
> Thanks for swift response! My measurements are very informal :-)  ie. I'm
> looking at Spark's 'Streaming' log page that informs "processing time"
> directly. That value is systematically 4 sec or 0,5 sec between different
> program runs for all batches.
>
> Prog is pretty simple so I do not expect that prog itself is causing the
> delay. I'm suspecting two causes:
>
> 1) Kudu connection. I have not found a recent streaming example so I might
> have some misunderstanding in my code. At the moment, I'm creating new
> KuduContext for each batch (val kuduContext = new KuduContext("m1:7051")).
> Pointer to good streaming example would definitively help.
>
> 2) Kafka connection. We've seen earlier that Spark executor locations have
> impact on performance.
>
> -jan
>
>
>
> On 28 Mar 2017, at 19.16, Todd Lipcon <t...@cloudera.com> wrote:
>
> On Tue, Mar 28, 2017 at 8:24 AM, Jan Holmberg <jan.holmb...@perigeum.fi>
> wrote:
>
>> I'm wondering the reason, why simple Spark prog. reading streaming data
>> from Kafka and writing result to Kudu, has unpredictable write times. In
>> most cases, when running the prog, write times are systematically 4 sec
>> regardless of the number of messages (anything from 50 to 2000 messages per
>> batch). But occasionally when starting the prog, it runs substantially
>> faster where write times are below 0,5 sec with exactly same code base,
>> settings etc.
>>
>
> How are you measuring "write times" here? Are you sure the time is being
> spent in the Kudu code and not in other parts of the streaming app?
>
> Writing 2000 rows to Kudu should be on the order of a few milliseconds --
> even 0.5 seconds sounds extremely high.
>
> Are you by chance instantiating a new KuduClient each time you write a
> batch, rather than reusing an existing one?
>
>
>>
>> Our environment is plain AWS cluster with 3 slaves where each slave has
>> Kafka and Kudu tablet server instance with CDH 5.10 & Kudu 1.2  & Spark 1.6.
>>
>> Any hints what to look at?
>>
>> cheers,
>> -jan
>
>
>
>
> --
> Todd Lipcon
> Software Engineer, Cloudera
>
>
>
>


-- 
Todd Lipcon
Software Engineer, Cloudera

Reply via email to