Hello, thanks, I tried to .set("spark.streaming.backpressure.enabled","true") but result is negative. Therefore I have prepared small test https://github.com/rssdev10/spark-kafka-streaming
How to run: * git clone https://github.com/rssdev10/spark-kafka-streaming.git <https://github.com/rssdev10/spark-kafka-streaming.git> cd spark-kafka-streaming* # downloads kafka and zookeeper * ./gradlew setup * # run zookeeper, kafka, and run messages generation * ./gradlew test_data_prepare * And in other console just run: * ./gradlew test_spark* It is easy to break data generation by CTRL-C. And continue by same command *./gradlew test_data_prepare* To stop all run: *./gradlew stop_all* Spark test must generate messages each 10 seconds like: *************************************************************************** Processing time: 33477 Expected time: 10000 Processed messages: 2897866 Message example: {"message": 1, "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"} Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"} *message* is number of fist message in the window. Time values are in milliseconds. Brief results: 1. Spark always reads all messaged from Kafka after first connection independently on dstream or window size time. It looks like a bug. 2. When processing speed in Spark's app is near to speed of data generation all is ok. 3. I added delayFactor in https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java to emulate slow processing. And streaming process is in degradation. When delayFactor=0 it looks like stable process. Cheers 2016-07-05 17:51 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > Test by producing messages into kafka at a rate comparable to what you > expect in production. > > Test with backpressure turned on, it doesn't require you to specify a > fixed limit on number of messages and will do its best to maintain > batch timing. Or you could empirically determine a reasonable fixed > limit. > > Setting up a kafka topic with way more static messages in it than your > system can handle in one batch, and then starting a stream from the > beginning of it without turning on backpressure or limiting the number > of messages... isn't a reasonable way to test steady state > performance. Flink can't magically give you a correct answer under > those circumstances either. > > On Tue, Jul 5, 2016 at 10:41 AM, rss rss <rssde...@gmail.com> wrote: > > Hi, thanks. > > > > I know about possibility to limit number of messages. But the problem > is > > I don't know number of messages which the system able to process. It > depends > > on data. The example is very simple. I need a strict response after > > specified time. Something like soft real time. In case of Flink I able to > > setup strict time of processing like this: > > > > KeyedStream<Event, Integer> keyed = > > eventStream.keyBy(event.userId.getBytes()[0] % partNum); > > WindowedStream<Event, Integer, TimeWindow> uniqUsersWin = > keyed.timeWindow( > > Time.seconds(10) ); > > DataStream<Aggregator> uniqUsers = > > uniq.trigger(ProcessingTimeTrigger.create()) > > .fold(new Aggregator(), new FoldFunction<Event, Aggregator>() { > > @Override > > public Aggregator fold(Aggregator accumulator, Event value) > > throws Exception { > > accumulator.add(event.userId); > > return accumulator; > > } > > }); > > > > uniq.print(); > > > > And I can see results every 10 seconds independently on input data > stream. > > Is it possible something in Spark? > > > > Regarding zeros in my example the reason I have prepared message queue in > > Kafka for the tests. If I add some messages after I able to see new > > messages. But in any case I need first response after 10 second. Not > minutes > > or hours after. > > > > Thanks. > > > > > > > > 2016-07-05 17:12 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> > >> If you're talking about limiting the number of messages per batch to > >> try and keep from exceeding batch time, see > >> > >> http://spark.apache.org/docs/latest/configuration.html > >> > >> look for backpressure and maxRatePerParition > >> > >> > >> But if you're only seeing zeros after your job runs for a minute, it > >> sounds like something else is wrong. > >> > >> > >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss <rssde...@gmail.com> wrote: > >> > Hello, > >> > > >> > I'm trying to organize processing of messages from Kafka. And there > is > >> > a > >> > typical case when a number of messages in kafka's queue is more then > >> > Spark > >> > app's possibilities to process. But I need a strong time limit to > >> > prepare > >> > result for at least for a part of data. > >> > > >> > Code example: > >> > > >> > SparkConf sparkConf = new SparkConf() > >> > .setAppName("Spark") > >> > .setMaster("local"); > >> > > >> > JavaStreamingContext jssc = new > JavaStreamingContext(sparkConf, > >> > Milliseconds.apply(5000)); > >> > > >> > jssc.checkpoint("/tmp/spark_checkpoint"); > >> > > >> > Set<String> topicMap = new > >> > HashSet<>(Arrays.asList(topicList.split(","))); > >> > Map<String, String> kafkaParams = new HashMap<String, > String>() > >> > { > >> > { > >> > put("metadata.broker.list", bootstrapServers); > >> > put("auto.offset.reset", "smallest"); > >> > } > >> > }; > >> > > >> > JavaPairInputDStream<String, String> messages = > >> > KafkaUtils.createDirectStream(jssc, > >> > String.class, > >> > String.class, > >> > StringDecoder.class, > >> > StringDecoder.class, > >> > kafkaParams, > >> > topicMap); > >> > > >> > messages.countByWindow(Seconds.apply(10), > >> > Milliseconds.apply(5000)) > >> > .map(x -> {System.out.println(x); return x;}) > >> > .dstream().saveAsTextFiles("/tmp/spark", > >> > "spark-streaming"); > >> > > >> > > >> > I need to see a result of window operation each 10 seconds (this is > >> > only > >> > simplest example). But really with my test data ~10M messages I have > >> > first > >> > result a minute after and further I see only zeros. Is a way to limit > >> > processing time to guarantee a response in specified time like Apache > >> > Flink's triggers? > >> > > >> > Thanks. > > > > >