Thank you very much for confirmation. On 20 June 2015 at 17:21, Tathagata Das <t...@databricks.com> wrote:
> No it does not. By default, only after all the retries etc related to > batch X is done, then batch X+1 will be started. > > Yes, one RDD per batch per DStream. However, the RDD could be a union of > multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned > DStream). > > TD > > On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia <mici...@gmail.com> > wrote: > >> Thanks Tathagata! >> >> I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() >> then. >> >> Does the default scheduler initiate the execution of the *batch X+1* >> after the *batch X* even if tasks for the* batch X *need to be *retried >> due to failures*? If not, please could you suggest workarounds and point >> me to the code? >> >> One more thing was not 100% clear to me from the documentation: Is there >> exactly *1 RDD* published *per a batch interval* in a DStream? >> >> >> >> On 19 June 2015 at 16:58, Tathagata Das <t...@databricks.com> wrote: >> >>> I see what is the problem. You are adding sleep in the transform >>> operation. The transform function is called at the time of preparing the >>> Spark jobs for a batch. It should not be running any time consuming >>> operation like a RDD action or a sleep. Since this operation needs to run >>> every batch interval, doing blocking long running operation messes with the >>> need to run every batch interval. >>> >>> I will try to make this clearer in the guide. I had not seen anyone do >>> something like this before and therefore it did not occur to me that this >>> could happen. As long as you dont do time consuming blocking operation in >>> the transform function, the batches will be generated, scheduled and >>> executed in serial order by default. >>> >>> On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia <mici...@gmail.com> >>> wrote: >>> >>>> Binh, thank you very much for your comment and code. Please could you >>>> outline an example use of your stream? I am a newbie to Spark. Thanks >>>> again! >>>> >>>> On 18 June 2015 at 14:29, Binh Nguyen Van <binhn...@gmail.com> wrote: >>>> >>>>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could >>>>> not get the serialized behavior by using default scheduler when there is >>>>> failure and retry >>>>> so I created a customized stream like this. >>>>> >>>>> class EachSeqRDD[T: ClassTag] ( >>>>> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit >>>>> ) extends DStream[Unit](parent.ssc) { >>>>> >>>>> override def slideDuration: Duration = parent.slideDuration >>>>> >>>>> override def dependencies: List[DStream[_]] = List(parent) >>>>> >>>>> override def compute(validTime: Time): Option[RDD[Unit]] = None >>>>> >>>>> override private[streaming] def generateJob(time: Time): Option[Job] = { >>>>> val pendingJobs = ssc.scheduler.getPendingTimes().size >>>>> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) >>>>> // do not generate new RDD if there is pending job >>>>> if (pendingJobs == 0) { >>>>> parent.getOrCompute(time) match { >>>>> case Some(rdd) => { >>>>> val jobFunc = () => { >>>>> ssc.sparkContext.setCallSite(creationSite) >>>>> eachSeqFunc(rdd, time) >>>>> } >>>>> Some(new Job(time, jobFunc)) >>>>> } >>>>> case None => None >>>>> } >>>>> } >>>>> else { >>>>> None >>>>> } >>>>> } >>>>> } >>>>> object DStreamEx { >>>>> implicit class EDStream[T: ClassTag](dStream: DStream[T]) { >>>>> def eachSeqRDD(func: (RDD[T], Time) => Unit) = { >>>>> // because the DStream is reachable from the outer object here, and >>>>> because >>>>> // DStreams can't be serialized with closures, we can't proactively >>>>> check >>>>> // it for serializability and so we pass the optional false to >>>>> SparkContext.clean >>>>> new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, >>>>> false)).register() >>>>> } >>>>> } >>>>> } >>>>> >>>>> -Binh >>>>> >>>>> >>>>> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <mici...@gmail.com> >>>>> wrote: >>>>> >>>>>> Tathagata, thanks for your response. You are right! Everything seems >>>>>> to work as expected. >>>>>> >>>>>> Please could help me understand why the time for processing of all >>>>>> jobs for a batch is always less than 4 seconds? >>>>>> >>>>>> Please see my playground code below. >>>>>> >>>>>> The last modified time of the input (lines) RDD dump files seems to >>>>>> match the Thread.sleep delays (20s or 5s) in the transform operation >>>>>> or the batching interval (10s): 20s, 5s, 10s. >>>>>> >>>>>> However, neither the batch processing time in the Streaming tab nor >>>>>> the last modified time of the output (words) RDD dump files reflect >>>>>> the Thread.sleep delays. >>>>>> >>>>>> 07:20 3240 001_lines_... >>>>>> 07:21 117 001_words_... >>>>>> 07:41 37224 002_lines_... >>>>>> 07:43 252 002_words_... >>>>>> 08:00 37728 003_lines_... >>>>>> 08:02 504 003_words_... >>>>>> 08:20 38952 004_lines_... >>>>>> 08:22 756 004_words_... >>>>>> 08:40 38664 005_lines_... >>>>>> 08:42 999 005_words_... >>>>>> 08:45 38160 006_lines_... >>>>>> 08:47 1134 006_words_... >>>>>> 08:50 9720 007_lines_... >>>>>> 08:51 1260 007_words_... >>>>>> 08:55 9864 008_lines_... >>>>>> 08:56 1260 008_words_... >>>>>> 09:00 10656 009_lines_... >>>>>> 09:01 1395 009_words_... >>>>>> 09:05 11664 010_lines_... >>>>>> 09:06 1395 010_words_... >>>>>> 09:11 10935 011_lines_... >>>>>> 09:11 1521 011_words_... >>>>>> 09:16 11745 012_lines_... >>>>>> 09:16 1530 012_words_... >>>>>> 09:21 12069 013_lines_... >>>>>> 09:22 1656 013_words_... >>>>>> 09:27 10692 014_lines_... >>>>>> 09:27 1665 014_words_... >>>>>> 09:32 10449 015_lines_... >>>>>> 09:32 1791 015_words_... >>>>>> 09:37 11178 016_lines_... >>>>>> 09:37 1800 016_words_... >>>>>> 09:45 17496 017_lines_... >>>>>> 09:45 1926 017_words_... >>>>>> 09:55 22032 018_lines_... >>>>>> 09:56 2061 018_words_... >>>>>> 10:05 21951 019_lines_... >>>>>> 10:06 2196 019_words_... >>>>>> 10:15 21870 020_lines_... >>>>>> 10:16 2322 020_words_... >>>>>> 10:25 21303 021_lines_... >>>>>> 10:26 2340 021_words_... >>>>>> >>>>>> >>>>>> final SparkConf conf = new >>>>>> SparkConf().setMaster("local[4]").setAppName("WordCount"); >>>>>> try (final JavaStreamingContext context = new >>>>>> JavaStreamingContext(conf, Durations.seconds(10))) { >>>>>> >>>>>> context.checkpoint("/tmp/checkpoint"); >>>>>> >>>>>> final JavaDStream<String> lines = context.union( >>>>>> context.receiverStream(new GeneratorReceiver()), >>>>>> ImmutableList.of( >>>>>> context.receiverStream(new GeneratorReceiver()), >>>>>> context.receiverStream(new GeneratorReceiver()))); >>>>>> >>>>>> lines.print(); >>>>>> >>>>>> final Accumulator<Integer> lineRddIndex = >>>>>> context.sparkContext().accumulator(0); >>>>>> lines.foreachRDD( rdd -> { >>>>>> lineRddIndex.add(1); >>>>>> final String prefix = "/tmp/" + String.format("%03d", >>>>>> lineRddIndex.localValue()) + "_lines_"; >>>>>> try (final PrintStream out = new PrintStream(prefix + >>>>>> UUID.randomUUID())) { >>>>>> rdd.collect().forEach(s -> out.println(s)); >>>>>> } >>>>>> return null; >>>>>> }); >>>>>> >>>>>> final JavaDStream<String> words = >>>>>> lines.flatMap(x -> Arrays.asList(x.split(" "))); >>>>>> final JavaPairDStream<String, Integer> pairs = >>>>>> words.mapToPair(s -> new Tuple2<String, Integer>(s, 1)); >>>>>> final JavaPairDStream<String, Integer> wordCounts = >>>>>> pairs.reduceByKey((i1, i2) -> i1 + i2); >>>>>> >>>>>> final Accumulator<Integer> sleep = >>>>>> context.sparkContext().accumulator(0); >>>>>> final JavaPairDStream<String, Integer> wordCounts2 = >>>>>> JavaPairDStream.fromJavaDStream( >>>>>> wordCounts.transform( (rdd) -> { >>>>>> sleep.add(1); >>>>>> Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000); >>>>>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >>>>>> rdd.classTag()); >>>>>> })); >>>>>> >>>>>> final Function2<List<Integer>, Optional<Integer>, >>>>>> Optional<Integer>> updateFunction = >>>>>> (values, state) -> { >>>>>> Integer newSum = state.or(0); >>>>>> for (final Integer value : values) { >>>>>> newSum += value; >>>>>> } >>>>>> return Optional.of(newSum); >>>>>> }; >>>>>> >>>>>> final List<Tuple2<String, Integer>> tuples = >>>>>> ImmutableList.<Tuple2<String, Integer>> of(); >>>>>> final JavaPairRDD<String, Integer> initialRDD = >>>>>> context.sparkContext().parallelizePairs(tuples); >>>>>> >>>>>> final JavaPairDStream<String, Integer> wordCountsState = >>>>>> wordCounts2.updateStateByKey( >>>>>> updateFunction, >>>>>> new >>>>>> HashPartitioner(context.sparkContext().defaultParallelism()), >>>>>> initialRDD); >>>>>> >>>>>> wordCountsState.print(); >>>>>> >>>>>> final Accumulator<Integer> rddIndex = >>>>>> context.sparkContext().accumulator(0); >>>>>> wordCountsState.foreachRDD( rdd -> { >>>>>> rddIndex.add(1); >>>>>> final String prefix = "/tmp/" + String.format("%03d", >>>>>> rddIndex.localValue()) + "_words_"; >>>>>> try (final PrintStream out = new PrintStream(prefix + >>>>>> UUID.randomUUID())) { >>>>>> rdd.collect().forEach(s -> out.println(s)); >>>>>> } >>>>>> return null; >>>>>> }); >>>>>> >>>>>> context.start(); >>>>>> context.awaitTermination(); >>>>>> } >>>>>> >>>>>> >>>>>> On 17 June 2015 at 17:25, Tathagata Das <t...@databricks.com> wrote: >>>>>> > The default behavior should be that batch X + 1 starts processing >>>>>> only after >>>>>> > batch X completes. If you are using Spark 1.4.0, could you show us a >>>>>> > screenshot of the streaming tab, especially the list of batches? >>>>>> And could >>>>>> > you also tell us if you are setting any SparkConf configurations? >>>>>> > >>>>>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia < >>>>>> mici...@gmail.com> wrote: >>>>>> >> >>>>>> >> Is it possible to achieve serial batching with Spark Streaming? >>>>>> >> >>>>>> >> Example: >>>>>> >> >>>>>> >> I configure the Streaming Context for creating a batch every 3 >>>>>> seconds. >>>>>> >> >>>>>> >> Processing of the batch #2 takes longer than 3 seconds and creates >>>>>> a >>>>>> >> backlog of batches: >>>>>> >> >>>>>> >> batch #1 takes 2s >>>>>> >> batch #2 takes 10s >>>>>> >> batch #3 takes 2s >>>>>> >> batch #4 takes 2s >>>>>> >> >>>>>> >> Whet testing locally, it seems that processing of multiple batches >>>>>> is >>>>>> >> finished at the same time: >>>>>> >> >>>>>> >> batch #1 finished at 2s >>>>>> >> batch #2 finished at 12s >>>>>> >> batch #3 finished at 12s (processed in parallel) >>>>>> >> batch #4 finished at 15s >>>>>> >> >>>>>> >> How can I delay processing of the next batch, so that is processed >>>>>> >> only after processing of the previous batch has been completed? >>>>>> >> >>>>>> >> batch #1 finished at 2s >>>>>> >> batch #2 finished at 12s >>>>>> >> batch #3 finished at 14s (processed serially) >>>>>> >> batch #4 finished at 16s >>>>>> >> >>>>>> >> I want to perform a transformation for every key only once in a >>>>>> given >>>>>> >> period of time (e.g. batch duration). I find all unique keys in a >>>>>> >> batch and perform the transformation on each key. To ensure that >>>>>> the >>>>>> >> transformation is done for every key only once, only one batch can >>>>>> be >>>>>> >> processed at a time. At the same time, I want that single batch to >>>>>> be >>>>>> >> processed in parallel. >>>>>> >> >>>>>> >> context = new JavaStreamingContext(conf, Durations.seconds(10)); >>>>>> >> stream = context.receiverStream(...); >>>>>> >> stream >>>>>> >> .reduceByKey(...) >>>>>> >> .transform(...) >>>>>> >> .foreachRDD(output); >>>>>> >> >>>>>> >> Any ideas or pointers are very welcome. >>>>>> >> >>>>>> >> Thanks! >>>>>> >> >>>>>> >> >>>>>> --------------------------------------------------------------------- >>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >> >>>>>> > >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >