Hi, thanks. I know about possibility to limit number of messages. But the problem is I don't know number of messages which the system able to process. It depends on data. The example is very simple. I need a strict response after specified time. Something like soft real time. In case of Flink I able to setup strict time of processing like this:
KeyedStream<Event, Integer> keyed = eventStream.keyBy(event.userId.getBytes()[0] % partNum); WindowedStream<Event, Integer, TimeWindow> uniqUsersWin = keyed.timeWindow( *Time.seconds(10*) ); DataStream<Aggregator> uniqUsers = uniq.trigger(*ProcessingTimeTrigger*.create()) .fold(new Aggregator(), new FoldFunction<Event, Aggregator>() { @Override public Aggregator fold(Aggregator accumulator, Event value) throws Exception { accumulator.add(event.userId); return accumulator; } }); uniq.print(); And I can see results every 10 seconds independently on input data stream. Is it possible something in Spark? Regarding zeros in my example the reason I have prepared message queue in Kafka for the tests. If I add some messages after I able to see new messages. But in any case I need first response after 10 second. Not minutes or hours after. Thanks. 2016-07-05 17:12 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > If you're talking about limiting the number of messages per batch to > try and keep from exceeding batch time, see > > http://spark.apache.org/docs/latest/configuration.html > > look for backpressure and maxRatePerParition > > > But if you're only seeing zeros after your job runs for a minute, it > sounds like something else is wrong. > > > On Tue, Jul 5, 2016 at 10:02 AM, rss rss <rssde...@gmail.com> wrote: > > Hello, > > > > I'm trying to organize processing of messages from Kafka. And there is > a > > typical case when a number of messages in kafka's queue is more then > Spark > > app's possibilities to process. But I need a strong time limit to prepare > > result for at least for a part of data. > > > > Code example: > > > > SparkConf sparkConf = new SparkConf() > > .setAppName("Spark") > > .setMaster("local"); > > > > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, > > Milliseconds.apply(5000)); > > > > jssc.checkpoint("/tmp/spark_checkpoint"); > > > > Set<String> topicMap = new > > HashSet<>(Arrays.asList(topicList.split(","))); > > Map<String, String> kafkaParams = new HashMap<String, String>() { > > { > > put("metadata.broker.list", bootstrapServers); > > put("auto.offset.reset", "smallest"); > > } > > }; > > > > JavaPairInputDStream<String, String> messages = > > KafkaUtils.createDirectStream(jssc, > > String.class, > > String.class, > > StringDecoder.class, > > StringDecoder.class, > > kafkaParams, > > topicMap); > > > > messages.countByWindow(Seconds.apply(10), > Milliseconds.apply(5000)) > > .map(x -> {System.out.println(x); return x;}) > > .dstream().saveAsTextFiles("/tmp/spark", > "spark-streaming"); > > > > > > I need to see a result of window operation each 10 seconds (this is > only > > simplest example). But really with my test data ~10M messages I have > first > > result a minute after and further I see only zeros. Is a way to limit > > processing time to guarantee a response in specified time like Apache > > Flink's triggers? > > > > Thanks. >