In spark streaming consuming kafka using KafkaUtils.createDirectStream, there are examples of the kafka offset level ranges. However if 1. I would like periodically maintain offset level so that if needed I can reprocess items from a offset. Is there any way I can retrieve offset of a message in rdd while I am processing each message? 2. Also with offsetranges, I have start and end offset for the RDD, but what if while processing each record of the RDD system encounters and error and job ends. Now if I want to begin processing from the record that failed, how do I first save the last successful offset so that I can start with that when starting next time.
Appreciate your help.