Hi

With Spark streaming (all versions), when my processing delay (around 2-4
seconds) exceeds the batch duration (being 1 second) and on a decent
scale/throughput (consuming around 100MB/s on 1+2 node standalone 15GB, 4
cores each) the job will start to throw block not found exceptions when the
Storage is set to MEMORY_ONLY (ensureFreeSpace
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L444>
drops
blocks blindly). When i use MEMORY_AND_DISK* as StorageLevel, then the
performance went down drastically and the receivers ends up doing a lot of
Disk IO.

So sticking with StorageLevel as MEMORY_ONLY the workaround to get ride of
the block not found exceptions was to tell the receiver not to generate
more blocks as there are blocks which are yet to get compute.

To achieve this, i used Spark 1.3.1 with the low level kafka consumer
<https://github.com/dibbhatt/kafka-spark-consumer>, and inside my Job's
onBatchCompleted i pushed the scheduling delay to zookeeper like:

[image: Inline image 1]


And on the receiver end, if there's scheduling delay, then it will simply
sleep for that much of time without sending any blocks to the Streaming
receiver. like:
[image: Inline image 2]


I could also add a condition there not to generate blocks if the scheduling
delay kind of exceeds 2-3 times the batch duration instead of making it
sleep for whatever scheduling delay is happening.

With this, the only problem I'm having is, some batches have empty data as
the receiver went to sleep for those batches. Everything else works nicely
at scale and the block not found is totally gone.

Please let me know your thoughts on this, can we generalize this for Kakfa
receivers with Sparkstreaming? Is it possible to apply this (stopping the
receiver from generating blocks) for all sort of receivers?


Thanks
Best Regards

Reply via email to