Thank you very much for confirmation.

On 20 June 2015 at 17:21, Tathagata Das <t...@databricks.com> wrote:

> No it does not. By default, only after all the retries etc related to
> batch X is done, then batch X+1 will be started.
>
> Yes, one RDD per batch per DStream. However, the RDD could be a union of
> multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned
> DStream).
>
> TD
>
> On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia <mici...@gmail.com>
> wrote:
>
>> Thanks Tathagata!
>>
>> I will use *foreachRDD*/*foreachPartition*() instead of *trasform*()
>> then.
>>
>> Does the default scheduler initiate the execution of the *batch X+1*
>> after the *batch X* even if tasks for the* batch X *need to be *retried
>> due to failures*? If not, please could you suggest workarounds and point
>> me to the code?
>>
>> One more thing was not 100% clear to me from the documentation: Is there
>> exactly *1 RDD* published *per a batch interval* in a DStream?
>>
>>
>>
>> On 19 June 2015 at 16:58, Tathagata Das <t...@databricks.com> wrote:
>>
>>> I see what is the problem. You are adding sleep in the transform
>>> operation. The transform function is called at the time of preparing the
>>> Spark jobs for a batch. It should not be running any time consuming
>>> operation like a RDD action or a sleep. Since this operation needs to run
>>> every batch interval, doing blocking long running operation messes with the
>>> need to run every batch interval.
>>>
>>> I will try to make this clearer in the guide. I had not seen anyone do
>>> something like this before and therefore it did not occur to me that this
>>> could happen. As long as you dont do time consuming blocking operation in
>>> the transform function, the batches will be generated, scheduled and
>>> executed in serial order by default.
>>>
>>> On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia <mici...@gmail.com>
>>> wrote:
>>>
>>>> Binh, thank you very much for your comment and code. Please could you
>>>> outline an example use of your stream? I am a newbie to Spark. Thanks 
>>>> again!
>>>>
>>>> On 18 June 2015 at 14:29, Binh Nguyen Van <binhn...@gmail.com> wrote:
>>>>
>>>>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could
>>>>> not get the serialized behavior by using default scheduler when there is
>>>>> failure and retry
>>>>> so I created a customized stream like this.
>>>>>
>>>>> class EachSeqRDD[T: ClassTag] (
>>>>>     parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
>>>>>   ) extends DStream[Unit](parent.ssc) {
>>>>>
>>>>>   override def slideDuration: Duration = parent.slideDuration
>>>>>
>>>>>   override def dependencies: List[DStream[_]] = List(parent)
>>>>>
>>>>>   override def compute(validTime: Time): Option[RDD[Unit]] = None
>>>>>
>>>>>   override private[streaming] def generateJob(time: Time): Option[Job] = {
>>>>>     val pendingJobs = ssc.scheduler.getPendingTimes().size
>>>>>     logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
>>>>>     // do not generate new RDD if there is pending job
>>>>>     if (pendingJobs == 0) {
>>>>>       parent.getOrCompute(time) match {
>>>>>         case Some(rdd) => {
>>>>>           val jobFunc = () => {
>>>>>             ssc.sparkContext.setCallSite(creationSite)
>>>>>             eachSeqFunc(rdd, time)
>>>>>           }
>>>>>           Some(new Job(time, jobFunc))
>>>>>         }
>>>>>         case None => None
>>>>>       }
>>>>>     }
>>>>>     else {
>>>>>       None
>>>>>     }
>>>>>   }
>>>>> }
>>>>> object DStreamEx {
>>>>>   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
>>>>>     def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
>>>>>       // because the DStream is reachable from the outer object here, and 
>>>>> because
>>>>>       // DStreams can't be serialized with closures, we can't proactively 
>>>>> check
>>>>>       // it for serializability and so we pass the optional false to 
>>>>> SparkContext.clean
>>>>>       new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, 
>>>>> false)).register()
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>> -Binh
>>>>> ​
>>>>>
>>>>> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <mici...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Tathagata, thanks for your response. You are right! Everything seems
>>>>>> to work as expected.
>>>>>>
>>>>>> Please could help me understand why the time for processing of all
>>>>>> jobs for a batch is always less than 4 seconds?
>>>>>>
>>>>>> Please see my playground code below.
>>>>>>
>>>>>> The last modified time of the input (lines) RDD dump files seems to
>>>>>> match the Thread.sleep delays (20s or 5s) in the transform operation
>>>>>> or the batching interval (10s): 20s, 5s, 10s.
>>>>>>
>>>>>> However, neither the batch processing time in the Streaming tab nor
>>>>>> the last modified time of the output (words) RDD dump files reflect
>>>>>> the Thread.sleep delays.
>>>>>>
>>>>>> 07:20       3240  001_lines_...
>>>>>>       07:21 117   001_words_...
>>>>>> 07:41       37224 002_lines_...
>>>>>>       07:43 252   002_words_...
>>>>>> 08:00       37728 003_lines_...
>>>>>>       08:02 504   003_words_...
>>>>>> 08:20       38952 004_lines_...
>>>>>>       08:22 756   004_words_...
>>>>>> 08:40       38664 005_lines_...
>>>>>>       08:42 999   005_words_...
>>>>>> 08:45       38160 006_lines_...
>>>>>>       08:47 1134  006_words_...
>>>>>> 08:50       9720  007_lines_...
>>>>>>       08:51 1260  007_words_...
>>>>>> 08:55       9864  008_lines_...
>>>>>>       08:56 1260  008_words_...
>>>>>> 09:00       10656 009_lines_...
>>>>>>       09:01 1395  009_words_...
>>>>>> 09:05       11664 010_lines_...
>>>>>>       09:06 1395  010_words_...
>>>>>> 09:11       10935 011_lines_...
>>>>>>       09:11 1521  011_words_...
>>>>>> 09:16       11745 012_lines_...
>>>>>>       09:16 1530  012_words_...
>>>>>> 09:21       12069 013_lines_...
>>>>>>       09:22 1656  013_words_...
>>>>>> 09:27       10692 014_lines_...
>>>>>>       09:27 1665  014_words_...
>>>>>> 09:32       10449 015_lines_...
>>>>>>       09:32 1791  015_words_...
>>>>>> 09:37       11178 016_lines_...
>>>>>>       09:37 1800  016_words_...
>>>>>> 09:45       17496 017_lines_...
>>>>>>       09:45 1926  017_words_...
>>>>>> 09:55       22032 018_lines_...
>>>>>>       09:56 2061  018_words_...
>>>>>> 10:05       21951 019_lines_...
>>>>>>       10:06 2196  019_words_...
>>>>>> 10:15       21870 020_lines_...
>>>>>>       10:16 2322  020_words_...
>>>>>> 10:25       21303 021_lines_...
>>>>>>       10:26 2340  021_words_...
>>>>>>
>>>>>>
>>>>>> final SparkConf conf = new
>>>>>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>>>>>> try (final JavaStreamingContext context = new
>>>>>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>>>>>
>>>>>>     context.checkpoint("/tmp/checkpoint");
>>>>>>
>>>>>>     final JavaDStream<String> lines = context.union(
>>>>>>         context.receiverStream(new GeneratorReceiver()),
>>>>>>         ImmutableList.of(
>>>>>>             context.receiverStream(new GeneratorReceiver()),
>>>>>>             context.receiverStream(new GeneratorReceiver())));
>>>>>>
>>>>>>     lines.print();
>>>>>>
>>>>>>     final Accumulator<Integer> lineRddIndex =
>>>>>> context.sparkContext().accumulator(0);
>>>>>>     lines.foreachRDD( rdd -> {
>>>>>>         lineRddIndex.add(1);
>>>>>>         final String prefix = "/tmp/" + String.format("%03d",
>>>>>> lineRddIndex.localValue()) + "_lines_";
>>>>>>         try (final PrintStream out = new PrintStream(prefix +
>>>>>> UUID.randomUUID())) {
>>>>>>             rdd.collect().forEach(s -> out.println(s));
>>>>>>         }
>>>>>>         return null;
>>>>>>     });
>>>>>>
>>>>>>     final JavaDStream<String> words =
>>>>>>         lines.flatMap(x -> Arrays.asList(x.split(" ")));
>>>>>>     final JavaPairDStream<String, Integer> pairs =
>>>>>>         words.mapToPair(s -> new Tuple2<String, Integer>(s, 1));
>>>>>>     final JavaPairDStream<String, Integer> wordCounts =
>>>>>>         pairs.reduceByKey((i1, i2) -> i1 + i2);
>>>>>>
>>>>>>     final Accumulator<Integer> sleep =
>>>>>> context.sparkContext().accumulator(0);
>>>>>>     final JavaPairDStream<String, Integer> wordCounts2 =
>>>>>> JavaPairDStream.fromJavaDStream(
>>>>>>         wordCounts.transform( (rdd) -> {
>>>>>>             sleep.add(1);
>>>>>>             Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000);
>>>>>>             return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>>>>>> rdd.classTag());
>>>>>>         }));
>>>>>>
>>>>>>     final Function2<List<Integer>, Optional<Integer>,
>>>>>> Optional<Integer>> updateFunction =
>>>>>>         (values, state) -> {
>>>>>>             Integer newSum = state.or(0);
>>>>>>             for (final Integer value : values) {
>>>>>>                 newSum += value;
>>>>>>             }
>>>>>>             return Optional.of(newSum);
>>>>>>         };
>>>>>>
>>>>>>     final List<Tuple2<String, Integer>> tuples =
>>>>>> ImmutableList.<Tuple2<String, Integer>> of();
>>>>>>     final JavaPairRDD<String, Integer> initialRDD =
>>>>>> context.sparkContext().parallelizePairs(tuples);
>>>>>>
>>>>>>     final JavaPairDStream<String, Integer> wordCountsState =
>>>>>>         wordCounts2.updateStateByKey(
>>>>>>              updateFunction,
>>>>>>              new
>>>>>> HashPartitioner(context.sparkContext().defaultParallelism()),
>>>>>> initialRDD);
>>>>>>
>>>>>>     wordCountsState.print();
>>>>>>
>>>>>>     final Accumulator<Integer> rddIndex =
>>>>>> context.sparkContext().accumulator(0);
>>>>>>     wordCountsState.foreachRDD( rdd -> {
>>>>>>         rddIndex.add(1);
>>>>>>         final String prefix = "/tmp/" + String.format("%03d",
>>>>>> rddIndex.localValue()) + "_words_";
>>>>>>         try (final PrintStream out = new PrintStream(prefix +
>>>>>> UUID.randomUUID())) {
>>>>>>             rdd.collect().forEach(s -> out.println(s));
>>>>>>         }
>>>>>>         return null;
>>>>>>     });
>>>>>>
>>>>>>     context.start();
>>>>>>     context.awaitTermination();
>>>>>> }
>>>>>>
>>>>>>
>>>>>> On 17 June 2015 at 17:25, Tathagata Das <t...@databricks.com> wrote:
>>>>>> > The default behavior should be that batch X + 1 starts processing
>>>>>> only after
>>>>>> > batch X completes. If you are using Spark 1.4.0, could you show us a
>>>>>> > screenshot of the streaming tab, especially the list of batches?
>>>>>> And could
>>>>>> > you also tell us if you are setting any SparkConf configurations?
>>>>>> >
>>>>>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <
>>>>>> mici...@gmail.com> wrote:
>>>>>> >>
>>>>>> >> Is it possible to achieve serial batching with Spark Streaming?
>>>>>> >>
>>>>>> >> Example:
>>>>>> >>
>>>>>> >> I configure the Streaming Context for creating a batch every 3
>>>>>> seconds.
>>>>>> >>
>>>>>> >> Processing of the batch #2 takes longer than 3 seconds and creates
>>>>>> a
>>>>>> >> backlog of batches:
>>>>>> >>
>>>>>> >> batch #1 takes 2s
>>>>>> >> batch #2 takes 10s
>>>>>> >> batch #3 takes 2s
>>>>>> >> batch #4 takes 2s
>>>>>> >>
>>>>>> >> Whet testing locally, it seems that processing of multiple batches
>>>>>> is
>>>>>> >> finished at the same time:
>>>>>> >>
>>>>>> >> batch #1 finished at 2s
>>>>>> >> batch #2 finished at 12s
>>>>>> >> batch #3 finished at 12s (processed in parallel)
>>>>>> >> batch #4 finished at 15s
>>>>>> >>
>>>>>> >> How can I delay processing of the next batch, so that is processed
>>>>>> >> only after processing of the previous batch has been completed?
>>>>>> >>
>>>>>> >> batch #1 finished at 2s
>>>>>> >> batch #2 finished at 12s
>>>>>> >> batch #3 finished at 14s (processed serially)
>>>>>> >> batch #4 finished at 16s
>>>>>> >>
>>>>>> >> I want to perform a transformation for every key only once in a
>>>>>> given
>>>>>> >> period of time (e.g. batch duration). I find all unique keys in a
>>>>>> >> batch and perform the transformation on each key. To ensure that
>>>>>> the
>>>>>> >> transformation is done for every key only once, only one batch can
>>>>>> be
>>>>>> >> processed at a time. At the same time, I want that single batch to
>>>>>> be
>>>>>> >> processed in parallel.
>>>>>> >>
>>>>>> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
>>>>>> >> stream = context.receiverStream(...);
>>>>>> >> stream
>>>>>> >>     .reduceByKey(...)
>>>>>> >>     .transform(...)
>>>>>> >>     .foreachRDD(output);
>>>>>> >>
>>>>>> >> Any ideas or pointers are very welcome.
>>>>>> >>
>>>>>> >> Thanks!
>>>>>> >>
>>>>>> >>
>>>>>> ---------------------------------------------------------------------
>>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to