Hello, I'm trying to organize processing of messages from Kafka. And there is a typical case when a number of messages in kafka's queue is more then Spark app's possibilities to process. But I need a strong time limit to prepare result for at least for a part of data.
Code example: SparkConf sparkConf = new SparkConf() .setAppName("Spark") .setMaster("local"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Milliseconds.apply(5000)); jssc.checkpoint("/tmp/spark_checkpoint"); Set<String> topicMap = new HashSet<>(Arrays.asList(topicList.split(","))); Map<String, String> kafkaParams = new HashMap<String, String>() { { put("metadata.broker.list", bootstrapServers); put("auto.offset.reset", "smallest"); } }; JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap); messages.countByWindow(Seconds.apply(10), Milliseconds.apply(5000)) * .map(x -> {System.out.println(x); return x;})* .dstream().saveAsTextFiles("/tmp/spark", "spark-streaming"); I need to see a result of window operation each 10 seconds (this is only simplest example). But really with my test data ~10M messages I have first result a minute after and further I see only zeros. Is a way to limit processing time to guarantee a response in specified time like Apache Flink's triggers? Thanks.