Hello,

  thanks, I tried to .set("spark.streaming.backpressure.enabled","true") but
result is negative. Therefore I have prepared small test
https://github.com/rssdev10/spark-kafka-streaming

  How to run:

*  git clone https://github.com/rssdev10/spark-kafka-streaming.git
<https://github.com/rssdev10/spark-kafka-streaming.git>  cd
spark-kafka-streaming*

  # downloads kafka and zookeeper
*  ./gradlew setup *

  # run zookeeper, kafka, and run messages generation
*  ./gradlew test_data_prepare *

And in other console just run:
*   ./gradlew test_spark*

It is easy to break data generation by CTRL-C. And continue by same
command *./gradlew
test_data_prepare*

To stop all run:
  *./gradlew stop_all*

Spark test must generate messages each 10 seconds like:
***************************************************************************
Processing time: 33477
Expected time: 10000
Processed messages: 2897866
Message example: {"message": 1,
"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}


*message* is number of fist message in the window. Time values are in
milliseconds.

Brief results:

   1. Spark always reads all messaged from Kafka after first connection
   independently on dstream or window size time. It looks like a bug.
   2. When processing speed in Spark's app is near to speed of data
   generation all is ok.
   3. I added delayFactor in
   
https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
   to emulate slow processing. And streaming process is in degradation. When
   delayFactor=0 it looks like stable process.


Cheers


2016-07-05 17:51 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> Test by producing messages into kafka at a rate comparable to what you
> expect in production.
>
> Test with backpressure turned on, it doesn't require you to specify a
> fixed limit on number of messages and will do its best to maintain
> batch timing.  Or you could empirically determine a reasonable fixed
> limit.
>
> Setting up a kafka topic with way more static messages in it than your
> system can handle in one batch, and then starting a stream from the
> beginning of it without turning on backpressure or limiting the number
> of messages... isn't a reasonable way to test steady state
> performance.  Flink can't magically give you a correct answer under
> those circumstances either.
>
> On Tue, Jul 5, 2016 at 10:41 AM, rss rss <rssde...@gmail.com> wrote:
> > Hi, thanks.
> >
> >    I know about possibility to limit number of messages. But the problem
> is
> > I don't know number of messages which the system able to process. It
> depends
> > on data. The example is very simple. I need a strict response after
> > specified time. Something like soft real time. In case of Flink I able to
> > setup strict time of processing like this:
> >
> > KeyedStream<Event, Integer> keyed =
> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
> > WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
> keyed.timeWindow(
> > Time.seconds(10) );
> > DataStream<Aggregator> uniqUsers =
> > uniq.trigger(ProcessingTimeTrigger.create())
> >         .fold(new Aggregator(), new FoldFunction<Event, Aggregator>() {
> >             @Override
> >             public Aggregator fold(Aggregator accumulator, Event value)
> > throws Exception {
> >                 accumulator.add(event.userId);
> >                 return accumulator;
> >             }
> >         });
> >
> > uniq.print();
> >
> > And I can see results every 10 seconds independently on input data
> stream.
> > Is it possible something in Spark?
> >
> > Regarding zeros in my example the reason I have prepared message queue in
> > Kafka for the tests. If I add some messages after I able to see new
> > messages. But in any case I need first response after 10 second. Not
> minutes
> > or hours after.
> >
> > Thanks.
> >
> >
> >
> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>
> >> If you're talking about limiting the number of messages per batch to
> >> try and keep from exceeding batch time, see
> >>
> >> http://spark.apache.org/docs/latest/configuration.html
> >>
> >> look for backpressure and maxRatePerParition
> >>
> >>
> >> But if you're only seeing zeros after your job runs for a minute, it
> >> sounds like something else is wrong.
> >>
> >>
> >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss <rssde...@gmail.com> wrote:
> >> > Hello,
> >> >
> >> >   I'm trying to organize processing of messages from Kafka. And there
> is
> >> > a
> >> > typical case when a number of messages in kafka's queue is more then
> >> > Spark
> >> > app's possibilities to process. But I need a strong time limit to
> >> > prepare
> >> > result for at least for a part of data.
> >> >
> >> > Code example:
> >> >
> >> >         SparkConf sparkConf = new SparkConf()
> >> >                 .setAppName("Spark")
> >> >                 .setMaster("local");
> >> >
> >> >         JavaStreamingContext jssc = new
> JavaStreamingContext(sparkConf,
> >> > Milliseconds.apply(5000));
> >> >
> >> >         jssc.checkpoint("/tmp/spark_checkpoint");
> >> >
> >> >         Set<String> topicMap = new
> >> > HashSet<>(Arrays.asList(topicList.split(",")));
> >> >         Map<String, String> kafkaParams = new HashMap<String,
> String>()
> >> > {
> >> >             {
> >> >                 put("metadata.broker.list", bootstrapServers);
> >> >                 put("auto.offset.reset", "smallest");
> >> >             }
> >> >         };
> >> >
> >> >         JavaPairInputDStream<String, String> messages =
> >> >                 KafkaUtils.createDirectStream(jssc,
> >> >                         String.class,
> >> >                         String.class,
> >> >                         StringDecoder.class,
> >> >                         StringDecoder.class,
> >> >                         kafkaParams,
> >> >                         topicMap);
> >> >
> >> >         messages.countByWindow(Seconds.apply(10),
> >> > Milliseconds.apply(5000))
> >> >                 .map(x -> {System.out.println(x); return x;})
> >> >                 .dstream().saveAsTextFiles("/tmp/spark",
> >> > "spark-streaming");
> >> >
> >> >
> >> >   I need to see a result of window operation each 10 seconds (this is
> >> > only
> >> > simplest example). But really with my test data ~10M messages I have
> >> > first
> >> > result a minute after and further I see only zeros. Is a way to limit
> >> > processing time to guarantee a response in specified time like Apache
> >> > Flink's triggers?
> >> >
> >> > Thanks.
> >
> >
>

Reply via email to