Hello all,

I have some doubts regarding performance tuning of my pipeline. I am trying
to achieve the following:

1. Consume from Kafka in 2 sec batches, filter it and remove 95% of data,
which comes down to around 4K messages/sec
2. Maintain keys (strings) by frequency over a moving window of say 1 day.
No. of keys in a day is roughly 1M.

// STAGE-1, Kafka-Spark-Consumer, consuming 224 partitions.
val filteredStream = ReceiverLauncher
  .launch(ssc, props, 28, ..)
  .map(record => new String(record.payload))
  .map(CreateRecordObj(_))
  .filter(_.isValidObject)

// To force spark compute 2 second batches as soon they are available:
// This is helpful in reducing executor's memory pressure which o/w
// keeps storing data for 900 sec due to spark's lazy nature.
val recordAccumulator = Accumulator
filteredStream.count.foreachRDD(rdd => {
    recordAccumulator += rdd.first
})

// STAGE-2
filteredStream
  // Put a window as I don't need result every 2 seconds but at an interval
  // of say every 15 minutes.
  .window(Seconds(900), Seconds(900))
  // Maintain frequency count of keys over a moving window of 1 day.
  .countByValueAndWindow(Seconds(24 * 60 * 60), Seconds(900))
  .foreachRDD(rdd => {
     val newRdd = rdd.map(_.swap).sortByKey(ascending = false).map(_.swap)
     newRdd.persist(..)
     globalQueue.enqueue(newRdd)
  })

Questions:
1. Is the above approach an overkill? Is there a better way to achieve the
desired? I know there are fancy algorithms like CountMinSketch but I think
it won't be able to perform updates of around 6K keys every 2 seconds. Is
that true?

2. Clearly there are 2 output operations being performed overall. Is 1st
output operation blocked by 2nd one? Would the 2nd one have any effect on
1st one if the processing time of 2nd one is say 600 seconds?

3. I know that the original 2 sec batches have 224 * 4 * 2 (kafka
partitions  x  no._of_blocks_per_sec  x  batch_duration) partitions. How
should one determine the numPartitions parameter for
countByValueAndWindow()?

4. Is there a way to force spark to be non lazy? For instance, kafka
receivers here are consuming at around 200MB/sec and with spark being lazy,
it piles up data for 900 sec and then does the processing. To avoid just
this, I have put the unnecessary foreachRDD() output operation in stage-1.
It enforces the filter step and reduces data to 3MB/sec which hugely
reduces memory pressure on executors.

--
Thanks
Jatin

Reply via email to