Re: Serial batching with Spark Streaming

2015-06-20 Thread Michal Čizmazia
String.format("%03d",
>>>>>> lineRddIndex.localValue()) + "_lines_";
>>>>>> try (final PrintStream out = new PrintStream(prefix +
>>>>>> UUID.randomUUID())) {
>>>>>>     rdd.collect().forEach(s -> out.println(s));
>>>>>> }
>>>>>> return null;
>>>>>> });
>>>>>>
>>>>>> final JavaDStream words =
>>>>>> lines.flatMap(x -> Arrays.asList(x.split(" ")));
>>>>>> final JavaPairDStream pairs =
>>>>>> words.mapToPair(s -> new Tuple2(s, 1));
>>>>>> final JavaPairDStream wordCounts =
>>>>>> pairs.reduceByKey((i1, i2) -> i1 + i2);
>>>>>>
>>>>>> final Accumulator sleep =
>>>>>> context.sparkContext().accumulator(0);
>>>>>> final JavaPairDStream wordCounts2 =
>>>>>> JavaPairDStream.fromJavaDStream(
>>>>>> wordCounts.transform( (rdd) -> {
>>>>>> sleep.add(1);
>>>>>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000);
>>>>>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>>>>>> rdd.classTag());
>>>>>> }));
>>>>>>
>>>>>> final Function2, Optional,
>>>>>> Optional> updateFunction =
>>>>>> (values, state) -> {
>>>>>> Integer newSum = state.or(0);
>>>>>> for (final Integer value : values) {
>>>>>> newSum += value;
>>>>>> }
>>>>>> return Optional.of(newSum);
>>>>>> };
>>>>>>
>>>>>> final List> tuples =
>>>>>> ImmutableList.> of();
>>>>>> final JavaPairRDD initialRDD =
>>>>>> context.sparkContext().parallelizePairs(tuples);
>>>>>>
>>>>>> final JavaPairDStream wordCountsState =
>>>>>> wordCounts2.updateStateByKey(
>>>>>>  updateFunction,
>>>>>>  new
>>>>>> HashPartitioner(context.sparkContext().defaultParallelism()),
>>>>>> initialRDD);
>>>>>>
>>>>>> wordCountsState.print();
>>>>>>
>>>>>> final Accumulator rddIndex =
>>>>>> context.sparkContext().accumulator(0);
>>>>>> wordCountsState.foreachRDD( rdd -> {
>>>>>> rddIndex.add(1);
>>>>>> final String prefix = "/tmp/" + String.format("%03d",
>>>>>> rddIndex.localValue()) + "_words_";
>>>>>> try (final PrintStream out = new PrintStream(prefix +
>>>>>> UUID.randomUUID())) {
>>>>>> rdd.collect().forEach(s -> out.println(s));
>>>>>> }
>>>>>> return null;
>>>>>> });
>>>>>>
>>>>>> context.start();
>>>>>> context.awaitTermination();
>>>>>> }
>>>>>>
>>>>>>
>>>>>> On 17 June 2015 at 17:25, Tathagata Das  wrote:
>>>>>> > The default behavior should be that batch X + 1 starts processing
>>>>>> only after
>>>>>> > batch X completes. If you are using Spark 1.4.0, could you show us a
>>>>>> > screenshot of the streaming tab, especially the list of batches?
>>>>>> And could
>>>>>> > you also tell us if you are setting any SparkConf configurations?
>>>>>> >
>>>>>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <
>>>>>> mici...@gmail.com> wrote:
>>>>>> >>
>>>>>> >> Is it possible to achieve serial batching with Spark Streaming?
>>>>>> >>
>>>>>> >> Example:
>>>>>> >>
>>>>>> >> I configure the Streaming Context for creating a batch every 3
>>>>>> seconds.
>>>>>> >>
>>>>>> >> Processing of the batch #2 takes longer than 3 seconds and creates
>>>>>> a
>>>>>> >> backlog of batches:
>>>>>> >>
>>>>>> >> batch #1 takes 2s
>>>>>> >> batch #2 takes 10s
>>>>>> >> batch #3 takes 2s
>>>>>> >> batch #4 takes 2s
>>>>>> >>
>>>>>> >> Whet testing locally, it seems that processing of multiple batches
>>>>>> is
>>>>>> >> finished at the same time:
>>>>>> >>
>>>>>> >> batch #1 finished at 2s
>>>>>> >> batch #2 finished at 12s
>>>>>> >> batch #3 finished at 12s (processed in parallel)
>>>>>> >> batch #4 finished at 15s
>>>>>> >>
>>>>>> >> How can I delay processing of the next batch, so that is processed
>>>>>> >> only after processing of the previous batch has been completed?
>>>>>> >>
>>>>>> >> batch #1 finished at 2s
>>>>>> >> batch #2 finished at 12s
>>>>>> >> batch #3 finished at 14s (processed serially)
>>>>>> >> batch #4 finished at 16s
>>>>>> >>
>>>>>> >> I want to perform a transformation for every key only once in a
>>>>>> given
>>>>>> >> period of time (e.g. batch duration). I find all unique keys in a
>>>>>> >> batch and perform the transformation on each key. To ensure that
>>>>>> the
>>>>>> >> transformation is done for every key only once, only one batch can
>>>>>> be
>>>>>> >> processed at a time. At the same time, I want that single batch to
>>>>>> be
>>>>>> >> processed in parallel.
>>>>>> >>
>>>>>> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
>>>>>> >> stream = context.receiverStream(...);
>>>>>> >> stream
>>>>>> >> .reduceByKey(...)
>>>>>> >> .transform(...)
>>>>>> >> .foreachRDD(output);
>>>>>> >>
>>>>>> >> Any ideas or pointers are very welcome.
>>>>>> >>
>>>>>> >> Thanks!
>>>>>> >>
>>>>>> >>
>>>>>> -
>>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Serial batching with Spark Streaming

2015-06-20 Thread Tathagata Das
elp me understand why the time for processing of all
>>>>> jobs for a batch is always less than 4 seconds?
>>>>>
>>>>> Please see my playground code below.
>>>>>
>>>>> The last modified time of the input (lines) RDD dump files seems to
>>>>> match the Thread.sleep delays (20s or 5s) in the transform operation
>>>>> or the batching interval (10s): 20s, 5s, 10s.
>>>>>
>>>>> However, neither the batch processing time in the Streaming tab nor
>>>>> the last modified time of the output (words) RDD dump files reflect
>>>>> the Thread.sleep delays.
>>>>>
>>>>> 07:20   3240  001_lines_...
>>>>>   07:21 117   001_words_...
>>>>> 07:41   37224 002_lines_...
>>>>>   07:43 252   002_words_...
>>>>> 08:00   37728 003_lines_...
>>>>>   08:02 504   003_words_...
>>>>> 08:20   38952 004_lines_...
>>>>>   08:22 756   004_words_...
>>>>> 08:40   38664 005_lines_...
>>>>>   08:42 999   005_words_...
>>>>> 08:45   38160 006_lines_...
>>>>>   08:47 1134  006_words_...
>>>>> 08:50   9720  007_lines_...
>>>>>   08:51 1260  007_words_...
>>>>> 08:55   9864  008_lines_...
>>>>>   08:56 1260  008_words_...
>>>>> 09:00   10656 009_lines_...
>>>>>   09:01 1395  009_words_...
>>>>> 09:05   11664 010_lines_...
>>>>>   09:06 1395  010_words_...
>>>>> 09:11   10935 011_lines_...
>>>>>   09:11 1521  011_words_...
>>>>> 09:16   11745 012_lines_...
>>>>>   09:16 1530  012_words_...
>>>>> 09:21   12069 013_lines_...
>>>>>   09:22 1656  013_words_...
>>>>> 09:27   10692 014_lines_...
>>>>>   09:27 1665  014_words_...
>>>>> 09:32   10449 015_lines_...
>>>>>   09:32 1791  015_words_...
>>>>> 09:37   11178 016_lines_...
>>>>>   09:37 1800  016_words_...
>>>>> 09:45   17496 017_lines_...
>>>>>   09:45 1926  017_words_...
>>>>> 09:55   22032 018_lines_...
>>>>>   09:56 2061  018_words_...
>>>>> 10:05   21951 019_lines_...
>>>>>   10:06 2196  019_words_...
>>>>> 10:15   21870 020_lines_...
>>>>>   10:16 2322  020_words_...
>>>>> 10:25   21303 021_lines_...
>>>>>   10:26 2340  021_words_...
>>>>>
>>>>>
>>>>> final SparkConf conf = new
>>>>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>>>>> try (final JavaStreamingContext context = new
>>>>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>>>>
>>>>> context.checkpoint("/tmp/checkpoint");
>>>>>
>>>>> final JavaDStream lines = context.union(
>>>>> context.receiverStream(new GeneratorReceiver()),
>>>>> ImmutableList.of(
>>>>> context.receiverStream(new GeneratorReceiver()),
>>>>> context.receiverStream(new GeneratorReceiver(;
>>>>>
>>>>> lines.print();
>>>>>
>>>>> final Accumulator lineRddIndex =
>>>>> context.sparkContext().accumulator(0);
>>>>> lines.foreachRDD( rdd -> {
>>>>> lineRddIndex.add(1);
>>>>> final String prefix = "/tmp/" + String.format("%03d",
>>>>> lineRddIndex.localValue()) + "_lines_";
>>>>> try (final PrintStream out = new PrintStream(prefix +
>>>>> UUID.randomUUID())) {
>>>>> rdd.collect().forEach(s -> out.println(s));
>>>>> }
>>>>> return null;
>>>>> });
>>>>>
>>>>> final JavaDStream words =
>>>>> lines.flatMap(x -> Arrays.asList(x.split(" ")));
>>>>> final JavaPairDStream pairs =
>>>>> words.mapToPair(s -> new Tuple2(s, 1));
>>>>> final JavaPairDStream wordCounts =
>>>>> pairs.reduceByKey((i1, i2) -> i1 + i2);
>>>>>
>>>

Re: Serial batching with Spark Streaming

2015-06-19 Thread Michal Čizmazia
gt;>>   07:21 117   001_words_...
>>>> 07:41   37224 002_lines_...
>>>>   07:43 252   002_words_...
>>>> 08:00   37728 003_lines_...
>>>>   08:02 504   003_words_...
>>>> 08:20   38952 004_lines_...
>>>>   08:22 756   004_words_...
>>>> 08:40   38664 005_lines_...
>>>>   08:42 999   005_words_...
>>>> 08:45   38160 006_lines_...
>>>>   08:47 1134  006_words_...
>>>> 08:50   9720  007_lines_...
>>>>   08:51 1260  007_words_...
>>>> 08:55   9864  008_lines_...
>>>>   08:56 1260  008_words_...
>>>> 09:00   10656 009_lines_...
>>>>   09:01 1395  009_words_...
>>>> 09:05   11664 010_lines_...
>>>>   09:06 1395  010_words_...
>>>> 09:11   10935 011_lines_...
>>>>   09:11 1521  011_words_...
>>>> 09:16   11745 012_lines_...
>>>>   09:16 1530  012_words_...
>>>> 09:21   12069 013_lines_...
>>>>   09:22 1656  013_words_...
>>>> 09:27   10692 014_lines_...
>>>>   09:27 1665  014_words_...
>>>> 09:32   10449 015_lines_...
>>>>   09:32 1791  015_words_...
>>>> 09:37   11178 016_lines_...
>>>>   09:37 1800  016_words_...
>>>> 09:45   17496 017_lines_...
>>>>   09:45 1926  017_words_...
>>>> 09:55   22032 018_lines_...
>>>>   09:56 2061  018_words_...
>>>> 10:05   21951 019_lines_...
>>>>   10:06 2196  019_words_...
>>>> 10:15   21870 020_lines_...
>>>>   10:16 2322  020_words_...
>>>> 10:25   21303 021_lines_...
>>>>   10:26 2340  021_words_...
>>>>
>>>>
>>>> final SparkConf conf = new
>>>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>>>> try (final JavaStreamingContext context = new
>>>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>>>
>>>> context.checkpoint("/tmp/checkpoint");
>>>>
>>>> final JavaDStream lines = context.union(
>>>> context.receiverStream(new GeneratorReceiver()),
>>>> ImmutableList.of(
>>>> context.receiverStream(new GeneratorReceiver()),
>>>> context.receiverStream(new GeneratorReceiver(;
>>>>
>>>> lines.print();
>>>>
>>>> final Accumulator lineRddIndex =
>>>> context.sparkContext().accumulator(0);
>>>> lines.foreachRDD( rdd -> {
>>>> lineRddIndex.add(1);
>>>> final String prefix = "/tmp/" + String.format("%03d",
>>>> lineRddIndex.localValue()) + "_lines_";
>>>> try (final PrintStream out = new PrintStream(prefix +
>>>> UUID.randomUUID())) {
>>>> rdd.collect().forEach(s -> out.println(s));
>>>> }
>>>> return null;
>>>> });
>>>>
>>>> final JavaDStream words =
>>>> lines.flatMap(x -> Arrays.asList(x.split(" ")));
>>>> final JavaPairDStream pairs =
>>>> words.mapToPair(s -> new Tuple2(s, 1));
>>>> final JavaPairDStream wordCounts =
>>>> pairs.reduceByKey((i1, i2) -> i1 + i2);
>>>>
>>>> final Accumulator sleep =
>>>> context.sparkContext().accumulator(0);
>>>> final JavaPairDStream wordCounts2 =
>>>> JavaPairDStream.fromJavaDStream(
>>>> wordCounts.transform( (rdd) -> {
>>>> sleep.add(1);
>>>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000);
>>>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>>>> rdd.classTag());
>>>> }));
>>>>
>>>> final Function2, Optional,
>>>> Optional> updateFunction =
>>>> (values, state) -> {
>>>> Integer newSum = state.or(0);
>>>> for (final Integer value : values) {
>>>> newSum += value;
>>>> }
>>>> return Optional.of(newSum);
>>>> };
>>>>
>>>> final List> tuples =
>>>> ImmutableList.> of();
>>>>  

Re: Serial batching with Spark Streaming

2015-06-19 Thread Tathagata Das
09:11 1521  011_words_...
>>> 09:16   11745 012_lines_...
>>>   09:16 1530  012_words_...
>>> 09:21   12069 013_lines_...
>>>   09:22 1656  013_words_...
>>> 09:27   10692 014_lines_...
>>>   09:27 1665  014_words_...
>>> 09:32   10449 015_lines_...
>>>   09:32 1791  015_words_...
>>> 09:37   11178 016_lines_...
>>>   09:37 1800  016_words_...
>>> 09:45   17496 017_lines_...
>>>   09:45 1926  017_words_...
>>> 09:55   22032 018_lines_...
>>>   09:56 2061  018_words_...
>>> 10:05   21951 019_lines_...
>>>   10:06 2196  019_words_...
>>> 10:15   21870 020_lines_...
>>>   10:16 2322  020_words_...
>>> 10:25   21303 021_lines_...
>>>   10:26 2340  021_words_...
>>>
>>>
>>> final SparkConf conf = new
>>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>>> try (final JavaStreamingContext context = new
>>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>>
>>> context.checkpoint("/tmp/checkpoint");
>>>
>>> final JavaDStream lines = context.union(
>>> context.receiverStream(new GeneratorReceiver()),
>>> ImmutableList.of(
>>> context.receiverStream(new GeneratorReceiver()),
>>> context.receiverStream(new GeneratorReceiver(;
>>>
>>> lines.print();
>>>
>>> final Accumulator lineRddIndex =
>>> context.sparkContext().accumulator(0);
>>> lines.foreachRDD( rdd -> {
>>> lineRddIndex.add(1);
>>> final String prefix = "/tmp/" + String.format("%03d",
>>> lineRddIndex.localValue()) + "_lines_";
>>> try (final PrintStream out = new PrintStream(prefix +
>>> UUID.randomUUID())) {
>>> rdd.collect().forEach(s -> out.println(s));
>>> }
>>> return null;
>>> });
>>>
>>> final JavaDStream words =
>>> lines.flatMap(x -> Arrays.asList(x.split(" ")));
>>> final JavaPairDStream pairs =
>>> words.mapToPair(s -> new Tuple2(s, 1));
>>> final JavaPairDStream wordCounts =
>>> pairs.reduceByKey((i1, i2) -> i1 + i2);
>>>
>>> final Accumulator sleep =
>>> context.sparkContext().accumulator(0);
>>> final JavaPairDStream wordCounts2 =
>>> JavaPairDStream.fromJavaDStream(
>>> wordCounts.transform( (rdd) -> {
>>> sleep.add(1);
>>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000);
>>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>>> rdd.classTag());
>>> }));
>>>
>>> final Function2, Optional,
>>> Optional> updateFunction =
>>> (values, state) -> {
>>> Integer newSum = state.or(0);
>>> for (final Integer value : values) {
>>> newSum += value;
>>> }
>>>     return Optional.of(newSum);
>>> };
>>>
>>> final List> tuples =
>>> ImmutableList.> of();
>>> final JavaPairRDD initialRDD =
>>> context.sparkContext().parallelizePairs(tuples);
>>>
>>> final JavaPairDStream wordCountsState =
>>> wordCounts2.updateStateByKey(
>>>  updateFunction,
>>>  new
>>> HashPartitioner(context.sparkContext().defaultParallelism()),
>>> initialRDD);
>>>
>>> wordCountsState.print();
>>>
>>> final Accumulator rddIndex =
>>> context.sparkContext().accumulator(0);
>>> wordCountsState.foreachRDD( rdd -> {
>>> rddIndex.add(1);
>>> final String prefix = "/tmp/" + String.format("%03d",
>>> rddIndex.localValue()) + "_words_";
>>> try (final PrintStream out = new PrintStream(prefix +
>>> UUID.randomUUID())) {
>>> rdd.collect().forEach(s -> out.println(s));
>>> }
>>> return null;
>>> });
>>>
>>> context.start();
>>> context.awaitTermination();
>>> }
>>>
>>>
>>> On 17 June 2015 at 17:25, Tathagata Das  wrote:
>>> > The default behavior should b

Re: Serial batching with Spark Streaming

2015-06-19 Thread Michal Čizmazia
ckpoint");
>>
>> final JavaDStream lines = context.union(
>> context.receiverStream(new GeneratorReceiver()),
>> ImmutableList.of(
>> context.receiverStream(new GeneratorReceiver()),
>> context.receiverStream(new GeneratorReceiver(;
>>
>> lines.print();
>>
>> final Accumulator lineRddIndex =
>> context.sparkContext().accumulator(0);
>> lines.foreachRDD( rdd -> {
>> lineRddIndex.add(1);
>> final String prefix = "/tmp/" + String.format("%03d",
>> lineRddIndex.localValue()) + "_lines_";
>> try (final PrintStream out = new PrintStream(prefix +
>> UUID.randomUUID())) {
>> rdd.collect().forEach(s -> out.println(s));
>> }
>> return null;
>> });
>>
>> final JavaDStream words =
>> lines.flatMap(x -> Arrays.asList(x.split(" ")));
>> final JavaPairDStream pairs =
>> words.mapToPair(s -> new Tuple2(s, 1));
>> final JavaPairDStream wordCounts =
>> pairs.reduceByKey((i1, i2) -> i1 + i2);
>>
>> final Accumulator sleep =
>> context.sparkContext().accumulator(0);
>> final JavaPairDStream wordCounts2 =
>> JavaPairDStream.fromJavaDStream(
>> wordCounts.transform( (rdd) -> {
>> sleep.add(1);
>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000);
>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>> rdd.classTag());
>> }));
>>
>> final Function2, Optional,
>> Optional> updateFunction =
>> (values, state) -> {
>> Integer newSum = state.or(0);
>> for (final Integer value : values) {
>> newSum += value;
>> }
>> return Optional.of(newSum);
>> };
>>
>> final List> tuples =
>> ImmutableList.> of();
>> final JavaPairRDD initialRDD =
>> context.sparkContext().parallelizePairs(tuples);
>>
>> final JavaPairDStream wordCountsState =
>> wordCounts2.updateStateByKey(
>>  updateFunction,
>>  new
>> HashPartitioner(context.sparkContext().defaultParallelism()),
>> initialRDD);
>>
>> wordCountsState.print();
>>
>> final Accumulator rddIndex =
>> context.sparkContext().accumulator(0);
>> wordCountsState.foreachRDD( rdd -> {
>> rddIndex.add(1);
>> final String prefix = "/tmp/" + String.format("%03d",
>> rddIndex.localValue()) + "_words_";
>> try (final PrintStream out = new PrintStream(prefix +
>> UUID.randomUUID())) {
>> rdd.collect().forEach(s -> out.println(s));
>> }
>> return null;
>> });
>>
>> context.start();
>> context.awaitTermination();
>> }
>>
>>
>> On 17 June 2015 at 17:25, Tathagata Das  wrote:
>> > The default behavior should be that batch X + 1 starts processing only
>> after
>> > batch X completes. If you are using Spark 1.4.0, could you show us a
>> > screenshot of the streaming tab, especially the list of batches? And
>> could
>> > you also tell us if you are setting any SparkConf configurations?
>> >
>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia 
>> wrote:
>> >>
>> >> Is it possible to achieve serial batching with Spark Streaming?
>> >>
>> >> Example:
>> >>
>> >> I configure the Streaming Context for creating a batch every 3 seconds.
>> >>
>> >> Processing of the batch #2 takes longer than 3 seconds and creates a
>> >> backlog of batches:
>> >>
>> >> batch #1 takes 2s
>> >> batch #2 takes 10s
>> >> batch #3 takes 2s
>> >> batch #4 takes 2s
>> >>
>> >> Whet testing locally, it seems that processing of multiple batches is
>> >> finished at the same time:
>> >>
>> >> batch #1 finished at 2s
>> >> batch #2 finished at 12s
>> >> batch #3 finished at 12s (processed in parallel)
>> >> batch #4 finished at 15s
>> >>
>> >> How can I delay processing of the next batch, so that is processed
>> >> only after processing of the previous batch has been completed?
>> >>
>> >> batch #1 finished at 2s
>> >> batch #2 finished at 12s
>> >> batch #3 finished at 14s (processed serially)
>> >> batch #4 finished at 16s
>> >>
>> >> I want to perform a transformation for every key only once in a given
>> >> period of time (e.g. batch duration). I find all unique keys in a
>> >> batch and perform the transformation on each key. To ensure that the
>> >> transformation is done for every key only once, only one batch can be
>> >> processed at a time. At the same time, I want that single batch to be
>> >> processed in parallel.
>> >>
>> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
>> >> stream = context.receiverStream(...);
>> >> stream
>> >> .reduceByKey(...)
>> >> .transform(...)
>> >> .foreachRDD(output);
>> >>
>> >> Any ideas or pointers are very welcome.
>> >>
>> >> Thanks!
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Serial batching with Spark Streaming

2015-06-18 Thread Michal Čizmazia
context.checkpoint("/tmp/checkpoint");
>>
>> final JavaDStream lines = context.union(
>> context.receiverStream(new GeneratorReceiver()),
>> ImmutableList.of(
>> context.receiverStream(new GeneratorReceiver()),
>> context.receiverStream(new GeneratorReceiver(;
>>
>> lines.print();
>>
>> final Accumulator lineRddIndex =
>> context.sparkContext().accumulator(0);
>> lines.foreachRDD( rdd -> {
>> lineRddIndex.add(1);
>> final String prefix = "/tmp/" + String.format("%03d",
>> lineRddIndex.localValue()) + "_lines_";
>> try (final PrintStream out = new PrintStream(prefix +
>> UUID.randomUUID())) {
>> rdd.collect().forEach(s -> out.println(s));
>> }
>> return null;
>> });
>>
>> final JavaDStream words =
>> lines.flatMap(x -> Arrays.asList(x.split(" ")));
>> final JavaPairDStream pairs =
>> words.mapToPair(s -> new Tuple2(s, 1));
>> final JavaPairDStream wordCounts =
>> pairs.reduceByKey((i1, i2) -> i1 + i2);
>>
>> final Accumulator sleep =
>> context.sparkContext().accumulator(0);
>> final JavaPairDStream wordCounts2 =
>> JavaPairDStream.fromJavaDStream(
>> wordCounts.transform( (rdd) -> {
>> sleep.add(1);
>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000);
>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>> rdd.classTag());
>> }));
>>
>> final Function2, Optional,
>> Optional> updateFunction =
>> (values, state) -> {
>> Integer newSum = state.or(0);
>> for (final Integer value : values) {
>> newSum += value;
>> }
>> return Optional.of(newSum);
>> };
>>
>> final List> tuples =
>> ImmutableList.> of();
>> final JavaPairRDD initialRDD =
>> context.sparkContext().parallelizePairs(tuples);
>>
>> final JavaPairDStream wordCountsState =
>> wordCounts2.updateStateByKey(
>>  updateFunction,
>>  new
>> HashPartitioner(context.sparkContext().defaultParallelism()),
>> initialRDD);
>>
>> wordCountsState.print();
>>
>> final Accumulator rddIndex =
>> context.sparkContext().accumulator(0);
>> wordCountsState.foreachRDD( rdd -> {
>> rddIndex.add(1);
>> final String prefix = "/tmp/" + String.format("%03d",
>> rddIndex.localValue()) + "_words_";
>> try (final PrintStream out = new PrintStream(prefix +
>> UUID.randomUUID())) {
>> rdd.collect().forEach(s -> out.println(s));
>> }
>> return null;
>> });
>>
>> context.start();
>> context.awaitTermination();
>> }
>>
>>
>> On 17 June 2015 at 17:25, Tathagata Das  wrote:
>> > The default behavior should be that batch X + 1 starts processing only
>> > after
>> > batch X completes. If you are using Spark 1.4.0, could you show us a
>> > screenshot of the streaming tab, especially the list of batches? And
>> > could
>> > you also tell us if you are setting any SparkConf configurations?
>> >
>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia 
>> > wrote:
>> >>
>> >> Is it possible to achieve serial batching with Spark Streaming?
>> >>
>> >> Example:
>> >>
>> >> I configure the Streaming Context for creating a batch every 3 seconds.
>> >>
>> >> Processing of the batch #2 takes longer than 3 seconds and creates a
>> >> backlog of batches:
>> >>
>> >> batch #1 takes 2s
>> >> batch #2 takes 10s
>> >> batch #3 takes 2s
>> >> batch #4 takes 2s
>> >>
>> >> Whet testing locally, it seems that processing of multiple batches is
>> >> finished at the same time:
>> >>
>> >> batch #1 finished at 2s
>> >> batch #2 finished at 12s
>> >> batch #3 finished at 12s (processed in parallel)
>> >> batch #4 finished at 15s
>> >>
>> >> How can I delay processing of the next batch, so that is processed
>> >> only after processing of the previous batch has been completed?
>> >>
>> >> batch #1 finished at 2s
>> >> batch #2 finished at 12s
>> >> batch #3 finished at 14s (processed serially)
>> >> batch #4 finished at 16s
>> >>
>> >> I want to perform a transformation for every key only once in a given
>> >> period of time (e.g. batch duration). I find all unique keys in a
>> >> batch and perform the transformation on each key. To ensure that the
>> >> transformation is done for every key only once, only one batch can be
>> >> processed at a time. At the same time, I want that single batch to be
>> >> processed in parallel.
>> >>
>> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
>> >> stream = context.receiverStream(...);
>> >> stream
>> >> .reduceByKey(...)
>> >> .transform(...)
>> >> .foreachRDD(output);
>> >>
>> >> Any ideas or pointers are very welcome.
>> >>
>> >> Thanks!
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serial batching with Spark Streaming

2015-06-18 Thread Binh Nguyen Van
llect().forEach(s -> out.println(s));
> }
> return null;
> });
>
> final JavaDStream words =
> lines.flatMap(x -> Arrays.asList(x.split(" ")));
> final JavaPairDStream pairs =
> words.mapToPair(s -> new Tuple2(s, 1));
> final JavaPairDStream wordCounts =
> pairs.reduceByKey((i1, i2) -> i1 + i2);
>
> final Accumulator sleep =
> context.sparkContext().accumulator(0);
> final JavaPairDStream wordCounts2 =
> JavaPairDStream.fromJavaDStream(
> wordCounts.transform( (rdd) -> {
> sleep.add(1);
> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000);
> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag());
> }));
>
> final Function2, Optional,
> Optional> updateFunction =
> (values, state) -> {
> Integer newSum = state.or(0);
> for (final Integer value : values) {
> newSum += value;
> }
> return Optional.of(newSum);
> };
>
> final List> tuples =
> ImmutableList.> of();
> final JavaPairRDD initialRDD =
> context.sparkContext().parallelizePairs(tuples);
>
> final JavaPairDStream wordCountsState =
> wordCounts2.updateStateByKey(
>  updateFunction,
>  new
> HashPartitioner(context.sparkContext().defaultParallelism()),
> initialRDD);
>
> wordCountsState.print();
>
> final Accumulator rddIndex =
> context.sparkContext().accumulator(0);
> wordCountsState.foreachRDD( rdd -> {
> rddIndex.add(1);
> final String prefix = "/tmp/" + String.format("%03d",
> rddIndex.localValue()) + "_words_";
> try (final PrintStream out = new PrintStream(prefix +
> UUID.randomUUID())) {
> rdd.collect().forEach(s -> out.println(s));
>     }
>     return null;
> });
>
> context.start();
> context.awaitTermination();
> }
>
>
> On 17 June 2015 at 17:25, Tathagata Das  wrote:
> > The default behavior should be that batch X + 1 starts processing only
> after
> > batch X completes. If you are using Spark 1.4.0, could you show us a
> > screenshot of the streaming tab, especially the list of batches? And
> could
> > you also tell us if you are setting any SparkConf configurations?
> >
> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia 
> wrote:
> >>
> >> Is it possible to achieve serial batching with Spark Streaming?
> >>
> >> Example:
> >>
> >> I configure the Streaming Context for creating a batch every 3 seconds.
> >>
> >> Processing of the batch #2 takes longer than 3 seconds and creates a
> >> backlog of batches:
> >>
> >> batch #1 takes 2s
> >> batch #2 takes 10s
> >> batch #3 takes 2s
> >> batch #4 takes 2s
> >>
> >> Whet testing locally, it seems that processing of multiple batches is
> >> finished at the same time:
> >>
> >> batch #1 finished at 2s
> >> batch #2 finished at 12s
> >> batch #3 finished at 12s (processed in parallel)
> >> batch #4 finished at 15s
> >>
> >> How can I delay processing of the next batch, so that is processed
> >> only after processing of the previous batch has been completed?
> >>
> >> batch #1 finished at 2s
> >> batch #2 finished at 12s
> >> batch #3 finished at 14s (processed serially)
> >> batch #4 finished at 16s
> >>
> >> I want to perform a transformation for every key only once in a given
> >> period of time (e.g. batch duration). I find all unique keys in a
> >> batch and perform the transformation on each key. To ensure that the
> >> transformation is done for every key only once, only one batch can be
> >> processed at a time. At the same time, I want that single batch to be
> >> processed in parallel.
> >>
> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
> >> stream = context.receiverStream(...);
> >> stream
> >> .reduceByKey(...)
> >> .transform(...)
> >> .foreachRDD(output);
> >>
> >> Any ideas or pointers are very welcome.
> >>
> >> Thanks!
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Serial batching with Spark Streaming

2015-06-18 Thread Michal Čizmazia
 batches? And could
> you also tell us if you are setting any SparkConf configurations?
>
> On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia  wrote:
>>
>> Is it possible to achieve serial batching with Spark Streaming?
>>
>> Example:
>>
>> I configure the Streaming Context for creating a batch every 3 seconds.
>>
>> Processing of the batch #2 takes longer than 3 seconds and creates a
>> backlog of batches:
>>
>> batch #1 takes 2s
>> batch #2 takes 10s
>> batch #3 takes 2s
>> batch #4 takes 2s
>>
>> Whet testing locally, it seems that processing of multiple batches is
>> finished at the same time:
>>
>> batch #1 finished at 2s
>> batch #2 finished at 12s
>> batch #3 finished at 12s (processed in parallel)
>> batch #4 finished at 15s
>>
>> How can I delay processing of the next batch, so that is processed
>> only after processing of the previous batch has been completed?
>>
>> batch #1 finished at 2s
>> batch #2 finished at 12s
>> batch #3 finished at 14s (processed serially)
>> batch #4 finished at 16s
>>
>> I want to perform a transformation for every key only once in a given
>> period of time (e.g. batch duration). I find all unique keys in a
>> batch and perform the transformation on each key. To ensure that the
>> transformation is done for every key only once, only one batch can be
>> processed at a time. At the same time, I want that single batch to be
>> processed in parallel.
>>
>> context = new JavaStreamingContext(conf, Durations.seconds(10));
>> stream = context.receiverStream(...);
>> stream
>> .reduceByKey(...)
>> .transform(...)
>> .foreachRDD(output);
>>
>> Any ideas or pointers are very welcome.
>>
>> Thanks!
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serial batching with Spark Streaming

2015-06-17 Thread Tathagata Das
The default behavior should be that batch X + 1 starts processing only
after batch X completes. If you are using Spark 1.4.0, could you show us a
screenshot of the streaming tab, especially the list of batches? And could
you also tell us if you are setting any SparkConf configurations?

On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia  wrote:

> Is it possible to achieve serial batching with Spark Streaming?
>
> Example:
>
> I configure the Streaming Context for creating a batch every 3 seconds.
>
> Processing of the batch #2 takes longer than 3 seconds and creates a
> backlog of batches:
>
> batch #1 takes 2s
> batch #2 takes 10s
> batch #3 takes 2s
> batch #4 takes 2s
>
> Whet testing locally, it seems that processing of multiple batches is
> finished at the same time:
>
> batch #1 finished at 2s
> batch #2 finished at 12s
> batch #3 finished at 12s (processed in parallel)
> batch #4 finished at 15s
>
> How can I delay processing of the next batch, so that is processed
> only after processing of the previous batch has been completed?
>
> batch #1 finished at 2s
> batch #2 finished at 12s
> batch #3 finished at 14s (processed serially)
> batch #4 finished at 16s
>
> I want to perform a transformation for every key only once in a given
> period of time (e.g. batch duration). I find all unique keys in a
> batch and perform the transformation on each key. To ensure that the
> transformation is done for every key only once, only one batch can be
> processed at a time. At the same time, I want that single batch to be
> processed in parallel.
>
> context = new JavaStreamingContext(conf, Durations.seconds(10));
> stream = context.receiverStream(...);
> stream
> .reduceByKey(...)
> .transform(...)
> .foreachRDD(output);
>
> Any ideas or pointers are very welcome.
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Serial batching with Spark Streaming

2015-06-17 Thread Michal Čizmazia
Is it possible to achieve serial batching with Spark Streaming?

Example:

I configure the Streaming Context for creating a batch every 3 seconds.

Processing of the batch #2 takes longer than 3 seconds and creates a
backlog of batches:

batch #1 takes 2s
batch #2 takes 10s
batch #3 takes 2s
batch #4 takes 2s

Whet testing locally, it seems that processing of multiple batches is
finished at the same time:

batch #1 finished at 2s
batch #2 finished at 12s
batch #3 finished at 12s (processed in parallel)
batch #4 finished at 15s

How can I delay processing of the next batch, so that is processed
only after processing of the previous batch has been completed?

batch #1 finished at 2s
batch #2 finished at 12s
batch #3 finished at 14s (processed serially)
batch #4 finished at 16s

I want to perform a transformation for every key only once in a given
period of time (e.g. batch duration). I find all unique keys in a
batch and perform the transformation on each key. To ensure that the
transformation is done for every key only once, only one batch can be
processed at a time. At the same time, I want that single batch to be
processed in parallel.

context = new JavaStreamingContext(conf, Durations.seconds(10));
stream = context.receiverStream(...);
stream
.reduceByKey(...)
.transform(...)
.foreachRDD(output);

Any ideas or pointers are very welcome.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org