To work around an out of space issue in a Direct Kafka Streaming application we create topics with a low retention policy (retention.ms=300000) which works fine from Kafka perspective. However this results into OffsetOutOfRangeException in Spark job (red line below). Is there any configuration in Spark to set to avoid receiving expired messages?
JavaInputDStream<MyTuple> dStream = KafkaUtils.createDirectStream(ssc, String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, MyTuple.class, kafkaParams, fromOffsets, messageHandler); dStream.foreachRDD( new VoidFunction<JavaRDD<MyTuple>>() { @Override public void call(JavaRDD<MyTuple> rdd) { final OffsetRange[] offSetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); rdd.foreachPartition(new VoidFunction<Iterator<MyTuple>>() { @Override public void call(Iterator<MyTuple> iterator) throws Exception { //init some data while (*iterator.hasNext()*) { //handle next Tuple } } //... } } });