Hi Raju, Could you please explain your expected behavior with the DStream. The DStream will have event only from the 'fromOffsets' that you provided in the createDirectStream (which I think is the expected behavior).
For the smaller files, you will have to deal with smaller files if you intend to write it immediately. Alternately what we do sometimes is- 1. Maintain couple of iterations for some 30-40 seconds in application until we have substantial data and then we write them to disk. 2. Push smaller data back to kafka, and a different job handles the save to disk. On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti <r...@apache.org> wrote: > Thanks for quick reply. > I am creating Kafka Dstream by passing offsets map. I have pasted code > snippet in my earlier mail. Let me know am I missing something. > > I want to use spark checkpoint for hand ng only driver/executor failures. > On Jan 22, 2016 10:08 PM, "Cody Koeninger" <c...@koeninger.org> wrote: > >> Offsets are stored in the checkpoint. If you want to manage offsets >> yourself, don't restart from the checkpoint, specify the starting offsets >> when you create the stream. >> >> Have you read / watched the materials linked from >> >> https://github.com/koeninger/kafka-exactly-once >> >> Regarding the small files problem, either don't use HDFS, or use >> something like filecrush for merging. >> >> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <r...@apache.org> >> wrote: >> >>> Hi, >>> >>> >>> I am very new to spark & spark-streaming. I am planning to use spark >>> streaming for real time processing. >>> >>> I have created a streaming context and checkpointing to hdfs >>> directory for recovery purposes in case of executor failures & driver >>> failures. >>> >>> I am creating Dstream with offset map for getting the data from kafka. I >>> am simply ignoring the offsets to understand the behavior. Whenver I >>> restart application driver restored from checkpoint as expected but Dstream >>> is not getting started from the initial offsets. Dstream was created with >>> the last consumed offsets instead of startign from 0 offsets for each topic >>> partition as I am not storing the offsets any where. >>> >>> def main : Unit = { >>> >>> var sparkStreamingContext = >>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, >>> () => creatingFunc()) >>> >>> ... >>> >>> >>> } >>> >>> def creatingFunc(): Unit = { >>> >>> ... >>> >>> var offsets:Map[TopicAndPartition, Long] = >>> Map(TopicAndPartition("sample_sample3_json",0) -> 0) >>> >>> KafkaUtils.createDirectStream[String,String, StringDecoder, >>> StringDecoder, >>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler) >>> >>> ... >>> } >>> >>> I want to get control over offset management at event level instead of >>> RDD level to make sure that at least once delivery to end system. >>> >>> As per my understanding, every RDD or RDD partition will stored in hdfs >>> as a file If I choose to use HDFS as output. If I use 1sec as batch >>> interval then it will be ended up having huge number of small files in >>> HDFS. Having small files in HDFS will leads to lots of other issues. >>> Is there any way to write multiple RDDs into single file? Don't have muh >>> idea about *coalesce* usage. In the worst case, I can merge all small files >>> in HDFS in regular intervals. >>> >>> Thanks... >>> >>> ------ >>> Thanks >>> Raju Bairishetti >>> www.lazada.com >>> >>> >>> >>> >>