No it does not. By default, only after all the retries etc related to batch
X is done, then batch X+1 will be started.

Yes, one RDD per batch per DStream. However, the RDD could be a union of
multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned
DStream).

TD

On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia <mici...@gmail.com> wrote:

> Thanks Tathagata!
>
> I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then.
>
> Does the default scheduler initiate the execution of the *batch X+1*
> after the *batch X* even if tasks for the* batch X *need to be *retried
> due to failures*? If not, please could you suggest workarounds and point
> me to the code?
>
> One more thing was not 100% clear to me from the documentation: Is there
> exactly *1 RDD* published *per a batch interval* in a DStream?
>
>
>
> On 19 June 2015 at 16:58, Tathagata Das <t...@databricks.com> wrote:
>
>> I see what is the problem. You are adding sleep in the transform
>> operation. The transform function is called at the time of preparing the
>> Spark jobs for a batch. It should not be running any time consuming
>> operation like a RDD action or a sleep. Since this operation needs to run
>> every batch interval, doing blocking long running operation messes with the
>> need to run every batch interval.
>>
>> I will try to make this clearer in the guide. I had not seen anyone do
>> something like this before and therefore it did not occur to me that this
>> could happen. As long as you dont do time consuming blocking operation in
>> the transform function, the batches will be generated, scheduled and
>> executed in serial order by default.
>>
>> On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia <mici...@gmail.com>
>> wrote:
>>
>>> Binh, thank you very much for your comment and code. Please could you
>>> outline an example use of your stream? I am a newbie to Spark. Thanks again!
>>>
>>> On 18 June 2015 at 14:29, Binh Nguyen Van <binhn...@gmail.com> wrote:
>>>
>>>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could
>>>> not get the serialized behavior by using default scheduler when there is
>>>> failure and retry
>>>> so I created a customized stream like this.
>>>>
>>>> class EachSeqRDD[T: ClassTag] (
>>>>     parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
>>>>   ) extends DStream[Unit](parent.ssc) {
>>>>
>>>>   override def slideDuration: Duration = parent.slideDuration
>>>>
>>>>   override def dependencies: List[DStream[_]] = List(parent)
>>>>
>>>>   override def compute(validTime: Time): Option[RDD[Unit]] = None
>>>>
>>>>   override private[streaming] def generateJob(time: Time): Option[Job] = {
>>>>     val pendingJobs = ssc.scheduler.getPendingTimes().size
>>>>     logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
>>>>     // do not generate new RDD if there is pending job
>>>>     if (pendingJobs == 0) {
>>>>       parent.getOrCompute(time) match {
>>>>         case Some(rdd) => {
>>>>           val jobFunc = () => {
>>>>             ssc.sparkContext.setCallSite(creationSite)
>>>>             eachSeqFunc(rdd, time)
>>>>           }
>>>>           Some(new Job(time, jobFunc))
>>>>         }
>>>>         case None => None
>>>>       }
>>>>     }
>>>>     else {
>>>>       None
>>>>     }
>>>>   }
>>>> }
>>>> object DStreamEx {
>>>>   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
>>>>     def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
>>>>       // because the DStream is reachable from the outer object here, and 
>>>> because
>>>>       // DStreams can't be serialized with closures, we can't proactively 
>>>> check
>>>>       // it for serializability and so we pass the optional false to 
>>>> SparkContext.clean
>>>>       new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, 
>>>> false)).register()
>>>>     }
>>>>   }
>>>> }
>>>>
>>>> -Binh
>>>> ​
>>>>
>>>> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <mici...@gmail.com>
>>>> wrote:
>>>>
>>>>> Tathagata, thanks for your response. You are right! Everything seems
>>>>> to work as expected.
>>>>>
>>>>> Please could help me understand why the time for processing of all
>>>>> jobs for a batch is always less than 4 seconds?
>>>>>
>>>>> Please see my playground code below.
>>>>>
>>>>> The last modified time of the input (lines) RDD dump files seems to
>>>>> match the Thread.sleep delays (20s or 5s) in the transform operation
>>>>> or the batching interval (10s): 20s, 5s, 10s.
>>>>>
>>>>> However, neither the batch processing time in the Streaming tab nor
>>>>> the last modified time of the output (words) RDD dump files reflect
>>>>> the Thread.sleep delays.
>>>>>
>>>>> 07:20       3240  001_lines_...
>>>>>       07:21 117   001_words_...
>>>>> 07:41       37224 002_lines_...
>>>>>       07:43 252   002_words_...
>>>>> 08:00       37728 003_lines_...
>>>>>       08:02 504   003_words_...
>>>>> 08:20       38952 004_lines_...
>>>>>       08:22 756   004_words_...
>>>>> 08:40       38664 005_lines_...
>>>>>       08:42 999   005_words_...
>>>>> 08:45       38160 006_lines_...
>>>>>       08:47 1134  006_words_...
>>>>> 08:50       9720  007_lines_...
>>>>>       08:51 1260  007_words_...
>>>>> 08:55       9864  008_lines_...
>>>>>       08:56 1260  008_words_...
>>>>> 09:00       10656 009_lines_...
>>>>>       09:01 1395  009_words_...
>>>>> 09:05       11664 010_lines_...
>>>>>       09:06 1395  010_words_...
>>>>> 09:11       10935 011_lines_...
>>>>>       09:11 1521  011_words_...
>>>>> 09:16       11745 012_lines_...
>>>>>       09:16 1530  012_words_...
>>>>> 09:21       12069 013_lines_...
>>>>>       09:22 1656  013_words_...
>>>>> 09:27       10692 014_lines_...
>>>>>       09:27 1665  014_words_...
>>>>> 09:32       10449 015_lines_...
>>>>>       09:32 1791  015_words_...
>>>>> 09:37       11178 016_lines_...
>>>>>       09:37 1800  016_words_...
>>>>> 09:45       17496 017_lines_...
>>>>>       09:45 1926  017_words_...
>>>>> 09:55       22032 018_lines_...
>>>>>       09:56 2061  018_words_...
>>>>> 10:05       21951 019_lines_...
>>>>>       10:06 2196  019_words_...
>>>>> 10:15       21870 020_lines_...
>>>>>       10:16 2322  020_words_...
>>>>> 10:25       21303 021_lines_...
>>>>>       10:26 2340  021_words_...
>>>>>
>>>>>
>>>>> final SparkConf conf = new
>>>>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>>>>> try (final JavaStreamingContext context = new
>>>>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>>>>
>>>>>     context.checkpoint("/tmp/checkpoint");
>>>>>
>>>>>     final JavaDStream<String> lines = context.union(
>>>>>         context.receiverStream(new GeneratorReceiver()),
>>>>>         ImmutableList.of(
>>>>>             context.receiverStream(new GeneratorReceiver()),
>>>>>             context.receiverStream(new GeneratorReceiver())));
>>>>>
>>>>>     lines.print();
>>>>>
>>>>>     final Accumulator<Integer> lineRddIndex =
>>>>> context.sparkContext().accumulator(0);
>>>>>     lines.foreachRDD( rdd -> {
>>>>>         lineRddIndex.add(1);
>>>>>         final String prefix = "/tmp/" + String.format("%03d",
>>>>> lineRddIndex.localValue()) + "_lines_";
>>>>>         try (final PrintStream out = new PrintStream(prefix +
>>>>> UUID.randomUUID())) {
>>>>>             rdd.collect().forEach(s -> out.println(s));
>>>>>         }
>>>>>         return null;
>>>>>     });
>>>>>
>>>>>     final JavaDStream<String> words =
>>>>>         lines.flatMap(x -> Arrays.asList(x.split(" ")));
>>>>>     final JavaPairDStream<String, Integer> pairs =
>>>>>         words.mapToPair(s -> new Tuple2<String, Integer>(s, 1));
>>>>>     final JavaPairDStream<String, Integer> wordCounts =
>>>>>         pairs.reduceByKey((i1, i2) -> i1 + i2);
>>>>>
>>>>>     final Accumulator<Integer> sleep =
>>>>> context.sparkContext().accumulator(0);
>>>>>     final JavaPairDStream<String, Integer> wordCounts2 =
>>>>> JavaPairDStream.fromJavaDStream(
>>>>>         wordCounts.transform( (rdd) -> {
>>>>>             sleep.add(1);
>>>>>             Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000);
>>>>>             return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>>>>> rdd.classTag());
>>>>>         }));
>>>>>
>>>>>     final Function2<List<Integer>, Optional<Integer>,
>>>>> Optional<Integer>> updateFunction =
>>>>>         (values, state) -> {
>>>>>             Integer newSum = state.or(0);
>>>>>             for (final Integer value : values) {
>>>>>                 newSum += value;
>>>>>             }
>>>>>             return Optional.of(newSum);
>>>>>         };
>>>>>
>>>>>     final List<Tuple2<String, Integer>> tuples =
>>>>> ImmutableList.<Tuple2<String, Integer>> of();
>>>>>     final JavaPairRDD<String, Integer> initialRDD =
>>>>> context.sparkContext().parallelizePairs(tuples);
>>>>>
>>>>>     final JavaPairDStream<String, Integer> wordCountsState =
>>>>>         wordCounts2.updateStateByKey(
>>>>>              updateFunction,
>>>>>              new
>>>>> HashPartitioner(context.sparkContext().defaultParallelism()),
>>>>> initialRDD);
>>>>>
>>>>>     wordCountsState.print();
>>>>>
>>>>>     final Accumulator<Integer> rddIndex =
>>>>> context.sparkContext().accumulator(0);
>>>>>     wordCountsState.foreachRDD( rdd -> {
>>>>>         rddIndex.add(1);
>>>>>         final String prefix = "/tmp/" + String.format("%03d",
>>>>> rddIndex.localValue()) + "_words_";
>>>>>         try (final PrintStream out = new PrintStream(prefix +
>>>>> UUID.randomUUID())) {
>>>>>             rdd.collect().forEach(s -> out.println(s));
>>>>>         }
>>>>>         return null;
>>>>>     });
>>>>>
>>>>>     context.start();
>>>>>     context.awaitTermination();
>>>>> }
>>>>>
>>>>>
>>>>> On 17 June 2015 at 17:25, Tathagata Das <t...@databricks.com> wrote:
>>>>> > The default behavior should be that batch X + 1 starts processing
>>>>> only after
>>>>> > batch X completes. If you are using Spark 1.4.0, could you show us a
>>>>> > screenshot of the streaming tab, especially the list of batches? And
>>>>> could
>>>>> > you also tell us if you are setting any SparkConf configurations?
>>>>> >
>>>>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <mici...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Is it possible to achieve serial batching with Spark Streaming?
>>>>> >>
>>>>> >> Example:
>>>>> >>
>>>>> >> I configure the Streaming Context for creating a batch every 3
>>>>> seconds.
>>>>> >>
>>>>> >> Processing of the batch #2 takes longer than 3 seconds and creates a
>>>>> >> backlog of batches:
>>>>> >>
>>>>> >> batch #1 takes 2s
>>>>> >> batch #2 takes 10s
>>>>> >> batch #3 takes 2s
>>>>> >> batch #4 takes 2s
>>>>> >>
>>>>> >> Whet testing locally, it seems that processing of multiple batches
>>>>> is
>>>>> >> finished at the same time:
>>>>> >>
>>>>> >> batch #1 finished at 2s
>>>>> >> batch #2 finished at 12s
>>>>> >> batch #3 finished at 12s (processed in parallel)
>>>>> >> batch #4 finished at 15s
>>>>> >>
>>>>> >> How can I delay processing of the next batch, so that is processed
>>>>> >> only after processing of the previous batch has been completed?
>>>>> >>
>>>>> >> batch #1 finished at 2s
>>>>> >> batch #2 finished at 12s
>>>>> >> batch #3 finished at 14s (processed serially)
>>>>> >> batch #4 finished at 16s
>>>>> >>
>>>>> >> I want to perform a transformation for every key only once in a
>>>>> given
>>>>> >> period of time (e.g. batch duration). I find all unique keys in a
>>>>> >> batch and perform the transformation on each key. To ensure that the
>>>>> >> transformation is done for every key only once, only one batch can
>>>>> be
>>>>> >> processed at a time. At the same time, I want that single batch to
>>>>> be
>>>>> >> processed in parallel.
>>>>> >>
>>>>> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
>>>>> >> stream = context.receiverStream(...);
>>>>> >> stream
>>>>> >>     .reduceByKey(...)
>>>>> >>     .transform(...)
>>>>> >>     .foreachRDD(output);
>>>>> >>
>>>>> >> Any ideas or pointers are very welcome.
>>>>> >>
>>>>> >> Thanks!
>>>>> >>
>>>>> >>
>>>>> ---------------------------------------------------------------------
>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> >>
>>>>> >
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to