I need some clarification for Kafka consumers in Spark or otherwise. I have the following Kafka Consumer. The consumer is reading from a topic, and I have a mechanism which blocks the consumer from time to time.
The producer is a separate thread which is continuously sending data. I want to ensure that the consumer does not drop/not read data sent during the period when the consumer was "blocked". *In case the "blocked" part is confusing - we have a modified Spark scheduler where we take a lock on the scheduler.* public static JavaDStream<String> getKafkaDStream(String inputTopics, String broker, int kafkaPort, JavaStreamingContext ssc){ HashSet<String> inputTopicsSet = new HashSet<String>(Arrays.asList(inputTopics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", broker + ":" + kafkaPort); JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, inputTopicsSet ); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); return lines; } Thanks Nipun