Re: Serial batching with Spark Streaming
Thank you very much for confirmation. On 20 June 2015 at 17:21, Tathagata Das wrote: > No it does not. By default, only after all the retries etc related to > batch X is done, then batch X+1 will be started. > > Yes, one RDD per batch per DStream. However, the RDD could be a union of > multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned > DStream). > > TD > > On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia > wrote: > >> Thanks Tathagata! >> >> I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() >> then. >> >> Does the default scheduler initiate the execution of the *batch X+1* >> after the *batch X* even if tasks for the* batch X *need to be *retried >> due to failures*? If not, please could you suggest workarounds and point >> me to the code? >> >> One more thing was not 100% clear to me from the documentation: Is there >> exactly *1 RDD* published *per a batch interval* in a DStream? >> >> >> >> On 19 June 2015 at 16:58, Tathagata Das wrote: >> >>> I see what is the problem. You are adding sleep in the transform >>> operation. The transform function is called at the time of preparing the >>> Spark jobs for a batch. It should not be running any time consuming >>> operation like a RDD action or a sleep. Since this operation needs to run >>> every batch interval, doing blocking long running operation messes with the >>> need to run every batch interval. >>> >>> I will try to make this clearer in the guide. I had not seen anyone do >>> something like this before and therefore it did not occur to me that this >>> could happen. As long as you dont do time consuming blocking operation in >>> the transform function, the batches will be generated, scheduled and >>> executed in serial order by default. >>> >>> On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia >>> wrote: >>> Binh, thank you very much for your comment and code. Please could you outline an example use of your stream? I am a newbie to Spark. Thanks again! On 18 June 2015 at 14:29, Binh Nguyen Van wrote: > I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could > not get the serialized behavior by using default scheduler when there is > failure and retry > so I created a customized stream like this. > > class EachSeqRDD[T: ClassTag] ( > parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit > ) extends DStream[Unit](parent.ssc) { > > override def slideDuration: Duration = parent.slideDuration > > override def dependencies: List[DStream[_]] = List(parent) > > override def compute(validTime: Time): Option[RDD[Unit]] = None > > override private[streaming] def generateJob(time: Time): Option[Job] = { > val pendingJobs = ssc.scheduler.getPendingTimes().size > logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) > // do not generate new RDD if there is pending job > if (pendingJobs == 0) { > parent.getOrCompute(time) match { > case Some(rdd) => { > val jobFunc = () => { > ssc.sparkContext.setCallSite(creationSite) > eachSeqFunc(rdd, time) > } > Some(new Job(time, jobFunc)) > } > case None => None > } > } > else { > None > } > } > } > object DStreamEx { > implicit class EDStream[T: ClassTag](dStream: DStream[T]) { > def eachSeqRDD(func: (RDD[T], Time) => Unit) = { > // because the DStream is reachable from the outer object here, and > because > // DStreams can't be serialized with closures, we can't proactively > check > // it for serializability and so we pass the optional false to > SparkContext.clean > new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, > false)).register() > } > } > } > > -Binh > > > On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia > wrote: > >> Tathagata, thanks for your response. You are right! Everything seems >> to work as expected. >> >> Please could help me understand why the time for processing of all >> jobs for a batch is always less than 4 seconds? >> >> Please see my playground code below. >> >> The last modified time of the input (lines) RDD dump files seems to >> match the Thread.sleep delays (20s or 5s) in the transform operation >> or the batching interval (10s): 20s, 5s, 10s. >> >> However, neither the batch processing time in the Streaming tab nor >> the last modified time of the output (words) RDD dump files reflect >> the Thread.sleep delays. >> >> 07:20 3240 001_lines_... >> 07:21 117 001_words_... >> 07:41 37224 002_lines_... >> 07:43 252 002_words_... >> 08:00 37728 003_lines_... >>>
Re: Serial batching with Spark Streaming
No it does not. By default, only after all the retries etc related to batch X is done, then batch X+1 will be started. Yes, one RDD per batch per DStream. However, the RDD could be a union of multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned DStream). TD On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia wrote: > Thanks Tathagata! > > I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. > > Does the default scheduler initiate the execution of the *batch X+1* > after the *batch X* even if tasks for the* batch X *need to be *retried > due to failures*? If not, please could you suggest workarounds and point > me to the code? > > One more thing was not 100% clear to me from the documentation: Is there > exactly *1 RDD* published *per a batch interval* in a DStream? > > > > On 19 June 2015 at 16:58, Tathagata Das wrote: > >> I see what is the problem. You are adding sleep in the transform >> operation. The transform function is called at the time of preparing the >> Spark jobs for a batch. It should not be running any time consuming >> operation like a RDD action or a sleep. Since this operation needs to run >> every batch interval, doing blocking long running operation messes with the >> need to run every batch interval. >> >> I will try to make this clearer in the guide. I had not seen anyone do >> something like this before and therefore it did not occur to me that this >> could happen. As long as you dont do time consuming blocking operation in >> the transform function, the batches will be generated, scheduled and >> executed in serial order by default. >> >> On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia >> wrote: >> >>> Binh, thank you very much for your comment and code. Please could you >>> outline an example use of your stream? I am a newbie to Spark. Thanks again! >>> >>> On 18 June 2015 at 14:29, Binh Nguyen Van wrote: >>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this. class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None => None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) => Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia wrote: > Tathagata, thanks for your response. You are right! Everything seems > to work as expected. > > Please could help me understand why the time for processing of all > jobs for a batch is always less than 4 seconds? > > Please see my playground code below. > > The last modified time of the input (lines) RDD dump files seems to > match the Thread.sleep delays (20s or 5s) in the transform operation > or the batching interval (10s): 20s, 5s, 10s. > > However, neither the batch processing time in the Streaming tab nor > the last modified time of the output (words) RDD dump files reflect > the Thread.sleep delays. > > 07:20 3240 001_lines_... > 07:21 117 001_words_... > 07:41 37224 002_lines_... > 07:43 252 002_words_... > 08:00 37728 003_lines_... > 08:02 504 003_words_... > 08:20 38952 004_lines_... > 08:22 756 004_words_... > 08:40 38664 005_lines_... > 08:42 999 005_words_... > 08:45 38160 006_lines_... >
Re: Serial batching with Spark Streaming
Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 19 June 2015 at 16:58, Tathagata Das wrote: > I see what is the problem. You are adding sleep in the transform > operation. The transform function is called at the time of preparing the > Spark jobs for a batch. It should not be running any time consuming > operation like a RDD action or a sleep. Since this operation needs to run > every batch interval, doing blocking long running operation messes with the > need to run every batch interval. > > I will try to make this clearer in the guide. I had not seen anyone do > something like this before and therefore it did not occur to me that this > could happen. As long as you dont do time consuming blocking operation in > the transform function, the batches will be generated, scheduled and > executed in serial order by default. > > On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia > wrote: > >> Binh, thank you very much for your comment and code. Please could you >> outline an example use of your stream? I am a newbie to Spark. Thanks again! >> >> On 18 June 2015 at 14:29, Binh Nguyen Van wrote: >> >>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could >>> not get the serialized behavior by using default scheduler when there is >>> failure and retry >>> so I created a customized stream like this. >>> >>> class EachSeqRDD[T: ClassTag] ( >>> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit >>> ) extends DStream[Unit](parent.ssc) { >>> >>> override def slideDuration: Duration = parent.slideDuration >>> >>> override def dependencies: List[DStream[_]] = List(parent) >>> >>> override def compute(validTime: Time): Option[RDD[Unit]] = None >>> >>> override private[streaming] def generateJob(time: Time): Option[Job] = { >>> val pendingJobs = ssc.scheduler.getPendingTimes().size >>> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) >>> // do not generate new RDD if there is pending job >>> if (pendingJobs == 0) { >>> parent.getOrCompute(time) match { >>> case Some(rdd) => { >>> val jobFunc = () => { >>> ssc.sparkContext.setCallSite(creationSite) >>> eachSeqFunc(rdd, time) >>> } >>> Some(new Job(time, jobFunc)) >>> } >>> case None => None >>> } >>> } >>> else { >>> None >>> } >>> } >>> } >>> object DStreamEx { >>> implicit class EDStream[T: ClassTag](dStream: DStream[T]) { >>> def eachSeqRDD(func: (RDD[T], Time) => Unit) = { >>> // because the DStream is reachable from the outer object here, and >>> because >>> // DStreams can't be serialized with closures, we can't proactively >>> check >>> // it for serializability and so we pass the optional false to >>> SparkContext.clean >>> new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, >>> false)).register() >>> } >>> } >>> } >>> >>> -Binh >>> >>> >>> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia >>> wrote: >>> Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_... 09:05 11664 010_lines_... 09:06 1395 010_words_... 09:11 10935 011_lines_... 09:11 1521 011_words_... 09:16 11745 012_lines_... 09:16 1530 012
Re: Serial batching with Spark Streaming
I see what is the problem. You are adding sleep in the transform operation. The transform function is called at the time of preparing the Spark jobs for a batch. It should not be running any time consuming operation like a RDD action or a sleep. Since this operation needs to run every batch interval, doing blocking long running operation messes with the need to run every batch interval. I will try to make this clearer in the guide. I had not seen anyone do something like this before and therefore it did not occur to me that this could happen. As long as you dont do time consuming blocking operation in the transform function, the batches will be generated, scheduled and executed in serial order by default. On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia wrote: > Binh, thank you very much for your comment and code. Please could you > outline an example use of your stream? I am a newbie to Spark. Thanks again! > > On 18 June 2015 at 14:29, Binh Nguyen Van wrote: > >> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not >> get the serialized behavior by using default scheduler when there is >> failure and retry >> so I created a customized stream like this. >> >> class EachSeqRDD[T: ClassTag] ( >> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit >> ) extends DStream[Unit](parent.ssc) { >> >> override def slideDuration: Duration = parent.slideDuration >> >> override def dependencies: List[DStream[_]] = List(parent) >> >> override def compute(validTime: Time): Option[RDD[Unit]] = None >> >> override private[streaming] def generateJob(time: Time): Option[Job] = { >> val pendingJobs = ssc.scheduler.getPendingTimes().size >> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) >> // do not generate new RDD if there is pending job >> if (pendingJobs == 0) { >> parent.getOrCompute(time) match { >> case Some(rdd) => { >> val jobFunc = () => { >> ssc.sparkContext.setCallSite(creationSite) >> eachSeqFunc(rdd, time) >> } >> Some(new Job(time, jobFunc)) >> } >> case None => None >> } >> } >> else { >> None >> } >> } >> } >> object DStreamEx { >> implicit class EDStream[T: ClassTag](dStream: DStream[T]) { >> def eachSeqRDD(func: (RDD[T], Time) => Unit) = { >> // because the DStream is reachable from the outer object here, and >> because >> // DStreams can't be serialized with closures, we can't proactively >> check >> // it for serializability and so we pass the optional false to >> SparkContext.clean >> new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, >> false)).register() >> } >> } >> } >> >> -Binh >> >> >> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia >> wrote: >> >>> Tathagata, thanks for your response. You are right! Everything seems >>> to work as expected. >>> >>> Please could help me understand why the time for processing of all >>> jobs for a batch is always less than 4 seconds? >>> >>> Please see my playground code below. >>> >>> The last modified time of the input (lines) RDD dump files seems to >>> match the Thread.sleep delays (20s or 5s) in the transform operation >>> or the batching interval (10s): 20s, 5s, 10s. >>> >>> However, neither the batch processing time in the Streaming tab nor >>> the last modified time of the output (words) RDD dump files reflect >>> the Thread.sleep delays. >>> >>> 07:20 3240 001_lines_... >>> 07:21 117 001_words_... >>> 07:41 37224 002_lines_... >>> 07:43 252 002_words_... >>> 08:00 37728 003_lines_... >>> 08:02 504 003_words_... >>> 08:20 38952 004_lines_... >>> 08:22 756 004_words_... >>> 08:40 38664 005_lines_... >>> 08:42 999 005_words_... >>> 08:45 38160 006_lines_... >>> 08:47 1134 006_words_... >>> 08:50 9720 007_lines_... >>> 08:51 1260 007_words_... >>> 08:55 9864 008_lines_... >>> 08:56 1260 008_words_... >>> 09:00 10656 009_lines_... >>> 09:01 1395 009_words_... >>> 09:05 11664 010_lines_... >>> 09:06 1395 010_words_... >>> 09:11 10935 011_lines_... >>> 09:11 1521 011_words_... >>> 09:16 11745 012_lines_... >>> 09:16 1530 012_words_... >>> 09:21 12069 013_lines_... >>> 09:22 1656 013_words_... >>> 09:27 10692 014_lines_... >>> 09:27 1665 014_words_... >>> 09:32 10449 015_lines_... >>> 09:32 1791 015_words_... >>> 09:37 11178 016_lines_... >>> 09:37 1800 016_words_... >>> 09:45 17496 017_lines_... >>> 09:45 1926 017_words_... >>> 09:55 22032 018_lines_... >>> 09:56 2061 018_words_... >>> 10:05 21951 019_lines_... >>> 10:06 2196 019_words_... >>> 10:15 21870 020_lines_... >>> 10:16 2322 020_words_... >>> 10:25 21303 021_lines_... >>> 10:26 2340
Re: Serial batching with Spark Streaming
Binh, thank you very much for your comment and code. Please could you outline an example use of your stream? I am a newbie to Spark. Thanks again! On 18 June 2015 at 14:29, Binh Nguyen Van wrote: > I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not > get the serialized behavior by using default scheduler when there is > failure and retry > so I created a customized stream like this. > > class EachSeqRDD[T: ClassTag] ( > parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit > ) extends DStream[Unit](parent.ssc) { > > override def slideDuration: Duration = parent.slideDuration > > override def dependencies: List[DStream[_]] = List(parent) > > override def compute(validTime: Time): Option[RDD[Unit]] = None > > override private[streaming] def generateJob(time: Time): Option[Job] = { > val pendingJobs = ssc.scheduler.getPendingTimes().size > logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) > // do not generate new RDD if there is pending job > if (pendingJobs == 0) { > parent.getOrCompute(time) match { > case Some(rdd) => { > val jobFunc = () => { > ssc.sparkContext.setCallSite(creationSite) > eachSeqFunc(rdd, time) > } > Some(new Job(time, jobFunc)) > } > case None => None > } > } > else { > None > } > } > } > object DStreamEx { > implicit class EDStream[T: ClassTag](dStream: DStream[T]) { > def eachSeqRDD(func: (RDD[T], Time) => Unit) = { > // because the DStream is reachable from the outer object here, and > because > // DStreams can't be serialized with closures, we can't proactively > check > // it for serializability and so we pass the optional false to > SparkContext.clean > new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, > false)).register() > } > } > } > > -Binh > > > On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia > wrote: > >> Tathagata, thanks for your response. You are right! Everything seems >> to work as expected. >> >> Please could help me understand why the time for processing of all >> jobs for a batch is always less than 4 seconds? >> >> Please see my playground code below. >> >> The last modified time of the input (lines) RDD dump files seems to >> match the Thread.sleep delays (20s or 5s) in the transform operation >> or the batching interval (10s): 20s, 5s, 10s. >> >> However, neither the batch processing time in the Streaming tab nor >> the last modified time of the output (words) RDD dump files reflect >> the Thread.sleep delays. >> >> 07:20 3240 001_lines_... >> 07:21 117 001_words_... >> 07:41 37224 002_lines_... >> 07:43 252 002_words_... >> 08:00 37728 003_lines_... >> 08:02 504 003_words_... >> 08:20 38952 004_lines_... >> 08:22 756 004_words_... >> 08:40 38664 005_lines_... >> 08:42 999 005_words_... >> 08:45 38160 006_lines_... >> 08:47 1134 006_words_... >> 08:50 9720 007_lines_... >> 08:51 1260 007_words_... >> 08:55 9864 008_lines_... >> 08:56 1260 008_words_... >> 09:00 10656 009_lines_... >> 09:01 1395 009_words_... >> 09:05 11664 010_lines_... >> 09:06 1395 010_words_... >> 09:11 10935 011_lines_... >> 09:11 1521 011_words_... >> 09:16 11745 012_lines_... >> 09:16 1530 012_words_... >> 09:21 12069 013_lines_... >> 09:22 1656 013_words_... >> 09:27 10692 014_lines_... >> 09:27 1665 014_words_... >> 09:32 10449 015_lines_... >> 09:32 1791 015_words_... >> 09:37 11178 016_lines_... >> 09:37 1800 016_words_... >> 09:45 17496 017_lines_... >> 09:45 1926 017_words_... >> 09:55 22032 018_lines_... >> 09:56 2061 018_words_... >> 10:05 21951 019_lines_... >> 10:06 2196 019_words_... >> 10:15 21870 020_lines_... >> 10:16 2322 020_words_... >> 10:25 21303 021_lines_... >> 10:26 2340 021_words_... >> >> >> final SparkConf conf = new >> SparkConf().setMaster("local[4]").setAppName("WordCount"); >> try (final JavaStreamingContext context = new >> JavaStreamingContext(conf, Durations.seconds(10))) { >> >> context.checkpoint("/tmp/checkpoint"); >> >> final JavaDStream lines = context.union( >> context.receiverStream(new GeneratorReceiver()), >> ImmutableList.of( >> context.receiverStream(new GeneratorReceiver()), >> context.receiverStream(new GeneratorReceiver(; >> >> lines.print(); >> >> final Accumulator lineRddIndex = >> context.sparkContext().accumulator(0); >> lines.foreachRDD( rdd -> { >> lineRddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> lineRddIndex.localValue()) + "_lines_"; >> try (final PrintStream out = new PrintStream(prefix + >> UUID.random
Re: Serial batching with Spark Streaming
Tathagata, Please could you confirm that batches are not processed in parallel during retries in Spark 1.4? See Binh's email copied below. Any pointers for workarounds if necessary? Thanks! On 18 June 2015 at 14:29, Binh Nguyen Van wrote: > I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not > get the serialized behavior by using default scheduler when there is failure > and retry > so I created a customized stream like this. > > class EachSeqRDD[T: ClassTag] ( > parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit > ) extends DStream[Unit](parent.ssc) { > > override def slideDuration: Duration = parent.slideDuration > > override def dependencies: List[DStream[_]] = List(parent) > > override def compute(validTime: Time): Option[RDD[Unit]] = None > > override private[streaming] def generateJob(time: Time): Option[Job] = { > val pendingJobs = ssc.scheduler.getPendingTimes().size > logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) > // do not generate new RDD if there is pending job > if (pendingJobs == 0) { > parent.getOrCompute(time) match { > case Some(rdd) => { > val jobFunc = () => { > ssc.sparkContext.setCallSite(creationSite) > eachSeqFunc(rdd, time) > } > Some(new Job(time, jobFunc)) > } > case None => None > } > } > else { > None > } > } > } > > object DStreamEx { > implicit class EDStream[T: ClassTag](dStream: DStream[T]) { > def eachSeqRDD(func: (RDD[T], Time) => Unit) = { > // because the DStream is reachable from the outer object here, and > because > // DStreams can't be serialized with closures, we can't proactively > check > // it for serializability and so we pass the optional false to > SparkContext.clean > new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, > false)).register() > } > } > } > > -Binh > > > On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia wrote: >> >> Tathagata, thanks for your response. You are right! Everything seems >> to work as expected. >> >> Please could help me understand why the time for processing of all >> jobs for a batch is always less than 4 seconds? >> >> Please see my playground code below. >> >> The last modified time of the input (lines) RDD dump files seems to >> match the Thread.sleep delays (20s or 5s) in the transform operation >> or the batching interval (10s): 20s, 5s, 10s. >> >> However, neither the batch processing time in the Streaming tab nor >> the last modified time of the output (words) RDD dump files reflect >> the Thread.sleep delays. >> >> 07:20 3240 001_lines_... >> 07:21 117 001_words_... >> 07:41 37224 002_lines_... >> 07:43 252 002_words_... >> 08:00 37728 003_lines_... >> 08:02 504 003_words_... >> 08:20 38952 004_lines_... >> 08:22 756 004_words_... >> 08:40 38664 005_lines_... >> 08:42 999 005_words_... >> 08:45 38160 006_lines_... >> 08:47 1134 006_words_... >> 08:50 9720 007_lines_... >> 08:51 1260 007_words_... >> 08:55 9864 008_lines_... >> 08:56 1260 008_words_... >> 09:00 10656 009_lines_... >> 09:01 1395 009_words_... >> 09:05 11664 010_lines_... >> 09:06 1395 010_words_... >> 09:11 10935 011_lines_... >> 09:11 1521 011_words_... >> 09:16 11745 012_lines_... >> 09:16 1530 012_words_... >> 09:21 12069 013_lines_... >> 09:22 1656 013_words_... >> 09:27 10692 014_lines_... >> 09:27 1665 014_words_... >> 09:32 10449 015_lines_... >> 09:32 1791 015_words_... >> 09:37 11178 016_lines_... >> 09:37 1800 016_words_... >> 09:45 17496 017_lines_... >> 09:45 1926 017_words_... >> 09:55 22032 018_lines_... >> 09:56 2061 018_words_... >> 10:05 21951 019_lines_... >> 10:06 2196 019_words_... >> 10:15 21870 020_lines_... >> 10:16 2322 020_words_... >> 10:25 21303 021_lines_... >> 10:26 2340 021_words_... >> >> >> final SparkConf conf = new >> SparkConf().setMaster("local[4]").setAppName("WordCount"); >> try (final JavaStreamingContext context = new >> JavaStreamingContext(conf, Durations.seconds(10))) { >> >> context.checkpoint("/tmp/checkpoint"); >> >> final JavaDStream lines = context.union( >> context.receiverStream(new GeneratorReceiver()), >> ImmutableList.of( >> context.receiverStream(new GeneratorReceiver()), >> context.receiverStream(new GeneratorReceiver(; >> >> lines.print(); >> >> final Accumulator lineRddIndex = >> context.sparkContext().accumulator(0); >> lines.foreachRDD( rdd -> { >> lineRddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> lineRddIndex.localValue()) + "_lines_"; >> try (final PrintStream out =
Re: Serial batching with Spark Streaming
I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this. class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None => None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) => Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia wrote: > Tathagata, thanks for your response. You are right! Everything seems > to work as expected. > > Please could help me understand why the time for processing of all > jobs for a batch is always less than 4 seconds? > > Please see my playground code below. > > The last modified time of the input (lines) RDD dump files seems to > match the Thread.sleep delays (20s or 5s) in the transform operation > or the batching interval (10s): 20s, 5s, 10s. > > However, neither the batch processing time in the Streaming tab nor > the last modified time of the output (words) RDD dump files reflect > the Thread.sleep delays. > > 07:20 3240 001_lines_... > 07:21 117 001_words_... > 07:41 37224 002_lines_... > 07:43 252 002_words_... > 08:00 37728 003_lines_... > 08:02 504 003_words_... > 08:20 38952 004_lines_... > 08:22 756 004_words_... > 08:40 38664 005_lines_... > 08:42 999 005_words_... > 08:45 38160 006_lines_... > 08:47 1134 006_words_... > 08:50 9720 007_lines_... > 08:51 1260 007_words_... > 08:55 9864 008_lines_... > 08:56 1260 008_words_... > 09:00 10656 009_lines_... > 09:01 1395 009_words_... > 09:05 11664 010_lines_... > 09:06 1395 010_words_... > 09:11 10935 011_lines_... > 09:11 1521 011_words_... > 09:16 11745 012_lines_... > 09:16 1530 012_words_... > 09:21 12069 013_lines_... > 09:22 1656 013_words_... > 09:27 10692 014_lines_... > 09:27 1665 014_words_... > 09:32 10449 015_lines_... > 09:32 1791 015_words_... > 09:37 11178 016_lines_... > 09:37 1800 016_words_... > 09:45 17496 017_lines_... > 09:45 1926 017_words_... > 09:55 22032 018_lines_... > 09:56 2061 018_words_... > 10:05 21951 019_lines_... > 10:06 2196 019_words_... > 10:15 21870 020_lines_... > 10:16 2322 020_words_... > 10:25 21303 021_lines_... > 10:26 2340 021_words_... > > > final SparkConf conf = new > SparkConf().setMaster("local[4]").setAppName("WordCount"); > try (final JavaStreamingContext context = new > JavaStreamingContext(conf, Durations.seconds(10))) { > > context.checkpoint("/tmp/checkpoint"); > > final JavaDStream lines = context.union( > context.receiverStream(new GeneratorReceiver()), > ImmutableList.of( > context.receiverStream(new GeneratorReceiver()), > context.receiverStream(new GeneratorReceiver(; > > lines.print(); > > final Accumulator lineRddIndex = > context.sparkContext().accumulator(0); > lines.foreachRDD( rdd -> { > lineRddIndex.add(1); > final String prefix = "/tmp/" + String.format("%03d", > lineRddIndex.localValue()) + "_lines_"; > try (final PrintStream out = new PrintStream(prefix + > UUID.randomUUID())) { > rdd.collect().forEach(s -> out.println(s)); > } > return null; > }); > > final JavaDStream words = > lines.flatMap(x -> Arrays.asList(x.split(" "))); > final JavaPairDStream pairs = > words.mapToPair(s -> new Tuple2(s, 1)); > final JavaPairDStream wordCounts = > pairs.reduceByKey((i1, i2) -> i1 + i2); > >
Re: Serial batching with Spark Streaming
Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_... 09:05 11664 010_lines_... 09:06 1395 010_words_... 09:11 10935 011_lines_... 09:11 1521 011_words_... 09:16 11745 012_lines_... 09:16 1530 012_words_... 09:21 12069 013_lines_... 09:22 1656 013_words_... 09:27 10692 014_lines_... 09:27 1665 014_words_... 09:32 10449 015_lines_... 09:32 1791 015_words_... 09:37 11178 016_lines_... 09:37 1800 016_words_... 09:45 17496 017_lines_... 09:45 1926 017_words_... 09:55 22032 018_lines_... 09:56 2061 018_words_... 10:05 21951 019_lines_... 10:06 2196 019_words_... 10:15 21870 020_lines_... 10:16 2322 020_words_... 10:25 21303 021_lines_... 10:26 2340 021_words_... final SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("WordCount"); try (final JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10))) { context.checkpoint("/tmp/checkpoint"); final JavaDStream lines = context.union( context.receiverStream(new GeneratorReceiver()), ImmutableList.of( context.receiverStream(new GeneratorReceiver()), context.receiverStream(new GeneratorReceiver(; lines.print(); final Accumulator lineRddIndex = context.sparkContext().accumulator(0); lines.foreachRDD( rdd -> { lineRddIndex.add(1); final String prefix = "/tmp/" + String.format("%03d", lineRddIndex.localValue()) + "_lines_"; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s -> out.println(s)); } return null; }); final JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split(" "))); final JavaPairDStream pairs = words.mapToPair(s -> new Tuple2(s, 1)); final JavaPairDStream wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2); final Accumulator sleep = context.sparkContext().accumulator(0); final JavaPairDStream wordCounts2 = JavaPairDStream.fromJavaDStream( wordCounts.transform( (rdd) -> { sleep.add(1); Thread.sleep(sleep.localValue() < 6 ? 2 : 5000); return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag()); })); final Function2, Optional, Optional> updateFunction = (values, state) -> { Integer newSum = state.or(0); for (final Integer value : values) { newSum += value; } return Optional.of(newSum); }; final List> tuples = ImmutableList.> of(); final JavaPairRDD initialRDD = context.sparkContext().parallelizePairs(tuples); final JavaPairDStream wordCountsState = wordCounts2.updateStateByKey( updateFunction, new HashPartitioner(context.sparkContext().defaultParallelism()), initialRDD); wordCountsState.print(); final Accumulator rddIndex = context.sparkContext().accumulator(0); wordCountsState.foreachRDD( rdd -> { rddIndex.add(1); final String prefix = "/tmp/" + String.format("%03d", rddIndex.localValue()) + "_words_"; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s -> out.println(s)); } return null; }); context.start(); context.awaitTermination(); } On 17 June 2015 at 17:25, Tathagata Das wrote: > The default behavior should be that batch X + 1 starts processing only after > batch X completes. If you are using Spark 1.4.0, could you show us a > screenshot of the streaming tab, especially the list of batches? And could > you also tell us if you are setting any SparkConf configurations? > > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia wrote:
Re: Serial batching with Spark Streaming
The default behavior should be that batch X + 1 starts processing only after batch X completes. If you are using Spark 1.4.0, could you show us a screenshot of the streaming tab, especially the list of batches? And could you also tell us if you are setting any SparkConf configurations? On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia wrote: > Is it possible to achieve serial batching with Spark Streaming? > > Example: > > I configure the Streaming Context for creating a batch every 3 seconds. > > Processing of the batch #2 takes longer than 3 seconds and creates a > backlog of batches: > > batch #1 takes 2s > batch #2 takes 10s > batch #3 takes 2s > batch #4 takes 2s > > Whet testing locally, it seems that processing of multiple batches is > finished at the same time: > > batch #1 finished at 2s > batch #2 finished at 12s > batch #3 finished at 12s (processed in parallel) > batch #4 finished at 15s > > How can I delay processing of the next batch, so that is processed > only after processing of the previous batch has been completed? > > batch #1 finished at 2s > batch #2 finished at 12s > batch #3 finished at 14s (processed serially) > batch #4 finished at 16s > > I want to perform a transformation for every key only once in a given > period of time (e.g. batch duration). I find all unique keys in a > batch and perform the transformation on each key. To ensure that the > transformation is done for every key only once, only one batch can be > processed at a time. At the same time, I want that single batch to be > processed in parallel. > > context = new JavaStreamingContext(conf, Durations.seconds(10)); > stream = context.receiverStream(...); > stream > .reduceByKey(...) > .transform(...) > .foreachRDD(output); > > Any ideas or pointers are very welcome. > > Thanks! > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >