Here is a scenario. Lets assume the stream that needs to be processed is 1 2 3 4 5 and a processing stage should generate a cumulative sum 1 3 6 10 15 to the next stage.
----> 1 2 3 4 5 ---> { stage 1 } -----> 1 3 6 10 15 ----> {stage 2} Each stage has two replicas, one active and one backup. The output of the active replica is sent to stage 2 and the output of backup replica is ignore. Replica 1 (active) and replica 2 (backup) are not processing the stream in sync. Replica 1 processed 1 2 3, and generated 1 3 and 6 which have been sent to stage 2. Replica 2 has processed 1 2 3 4 and generated 1 3 6 10, but its output has been ignored because it is the backup. Replica 1 crashes, and replica 2 immediately become active and its output starts getting pushed to stage2. Replica 2 consumes input 5 next and generates 15. 10 was never pushed to stage 2, because replica 1 crashed before generating it and replica 2 became active after generating 10 (and being ignored). This is a very simple workload. Flux introduced its synch protocol to make sure that Replica 2 starts generating results to stage 2 exactly at the same point where replica 1 left off. TD On Wed, Feb 19, 2014 at 5:01 AM, dachuan <hdc1...@gmail.com> wrote: > Thanks for the clarification. Those are by default enabled, I noticed in > the source code of statedstream and the inverted reducedwindow stream. > > I have one further question about the high level idea. Sosp 2013 streaming > paper had attacked existing work Flux for they have to use high overhead > synchronization protocol between replications. If possible, could you > please give me one concrete workload that really needs synchronizing > between replications? > > I have browesed through the flux paper and didn't find that concrete > workload. > > Thanks > Dachuan. > On Feb 19, 2014 1:24 AM, "Tathagata Das" <tathagata.das1...@gmail.com> > wrote: > >> Apologies if that was confusing. I meant that in streaming application >> where a few specific DStream operations like updateStateByKey, etc. are >> used, you have to enable checkpointing. In fact, the system automatically >> enables checkpointing with some default interval for DStreams that are >> generated by those operations that need checkpointing. >> >> TD >> >> >> On Tue, Feb 18, 2014 at 2:57 PM, dachuan <hdc1...@gmail.com> wrote: >> >>> I'm sorry but what does "its not fundamentally not possible to avoid >>> checkpointing" mean? >>> >>> Are you saying that for these two stateful streaming app, it's possible >>> to avoid checkpointing? how? >>> >>> >>> On Tue, Feb 18, 2014 at 5:44 PM, Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> A3: The basic RDD model is that the dataset is immutable. As new >>>> batches of data come in, each batch is treat as a RDD. Then RDD >>>> transformations are applied to create new RDDs. When some of these RDDs are >>>> checkpointed, then create separate HDFS files. So, yes, the checkpoint >>>> files will keep accumulating unless deleted. And they should be only >>>> deleted when we know that the RDD that represents the checkpoint file is >>>> not in scope and not going to be used any more. Specifically for Spark >>>> Streaming programs, this deletion of checkpoint files is take care off >>>> automatically as Spark Streaming tracks which RDDs are not going to be used >>>> any more. >>>> >>>> A4: If there is a NFS-like file system that is accessible through the >>>> same mount point at all the nodes in the cluster running Spark, then you >>>> can set a local path to that file system as the checkpoint directory. Then >>>> the checkpoint files will be written to the NFS file system. Note that it >>>> assumes that the NFS-like filesystem is fault-tolerant and the checkpoitn >>>> file will be always accessible (in case Spark has to recover RDDs from the >>>> checkpointed file). >>>> >>>> Spark's RDDs have the power to recompute based on the lineage >>>> information to ancestor RDDs. However, that lineage must be finitely long >>>> (otherwise, it will have infinite recomputation) and must end at some >>>> dataset that is inherently fault-tolerant (like HDFS file). In streaming >>>> however, the lineage can grow infinitely (depends on the computation), and >>>> is not backed by a HDFS files. That's why we HAVE TO >>>> (i) Replicate raw input data to make the input dataset fault-tolerant. >>>> (ii) Periodically checkpoint to limit the length of the lineage and >>>> there limit the amount of recomputation necessary under faults. >>>> So for certain kinds of streaming computations (e.g. updateStateByKey, >>>> reduceByKeyAndWindow with inverse function, etc.) its not fundamentally not >>>> possible to avoid checkpointing. >>>> >>>> Hope this helps! >>>> >>>> TD >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Feb 18, 2014 at 10:56 AM, Adrian Mocanu < >>>> amoc...@verticalscope.com> wrote: >>>> >>>>> Hi Tathagata! >>>>> >>>>> Thanks for getting back to me. >>>>> >>>>> >>>>> >>>>> I've 2 follow up questions. >>>>> >>>>> >>>>> >>>>> Q3: You've explained how data is checkpointed, but never addressed the >>>>> part about new batches overwriting old batches. Is it true that data is >>>>> overwritten or is it that all data is saved resulting in a huge blob. >>>>> >>>>> >>>>> >>>>> Q4: >>>>> >>>>> I don't use hdfs, but a unix files system. Will my stream not be fault >>>>> tolerant in this case bc there is no replication? Won't, in this case, >>>>> Spark use the power of its RDDs? >>>>> >>>>> >>>>> >>>>> Thanks again >>>>> >>>>> A >>>>> >>>>> >>>>> >>>>> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com] >>>>> *Sent:* February-14-14 8:15 PM >>>>> *To:* user@spark.incubator.apache.org >>>>> *Subject:* Re: checkpoint and not running out of disk space >>>>> >>>>> >>>>> >>>>> Hello Adrian, >>>>> >>>>> >>>>> >>>>> A1: There is a significant difference between cache and checkpoint. >>>>> Cache materializes the RDD and keeps it in memory and / or disk. But the >>>>> lineage of RDD (that is, seq of operations that generated the RDD) will be >>>>> remembered, so that if there are node failures and parts of the cached >>>>> RDDs >>>>> are lost, they an be regenerated. However, checkpoint saves the RDD to an >>>>> HDFS file AND actually FORGETS the lineage completely. This is allows long >>>>> lineages to be truncated and the data to be saved reliably in HDFS (which >>>>> is naturally fault tolerant by replication). >>>>> >>>>> >>>>> >>>>> A2: The answer to this is a little involved. There are actually two >>>>> kinds of checkpoitning going on - >>>>> >>>>> (i) data checkpointing (i.e. RDD checkpoints) and >>>>> >>>>> (ii) metadata checkpointing, which stores the RDD checkpoint file >>>>> names associated with DStreams to a file in HDFS, and other stuff >>>>> >>>>> >>>>> >>>>> The metadata checkpoint is done every batch. >>>>> >>>>> The data checkpointing, that is, the frequency at which RDDs of a >>>>> DStream is checkpointed is configurable with >>>>> DStream.checkpoint(...duration...). Duration must be a multiple of the >>>>> batch interval / slide interval for windowed streams. However, by default, >>>>> the interval of checkpointing is MAX(batch interval, 10 seconds). So if >>>>> batch interval is more than 10 seconds, then every batch; if its less than >>>>> 10 seconds, then every 10 seconds (multiple of batch interval that is more >>>>> than 10 seconds). >>>>> >>>>> >>>>> >>>>> TD >>>>> >>>>> >>>>> >>>>> On Fri, Feb 14, 2014 at 8:47 AM, Adrian Mocanu < >>>>> amoc...@verticalscope.com> wrote: >>>>> >>>>> Hi >>>>> >>>>> >>>>> >>>>> Q1: >>>>> >>>>> Is .checkpoint() the same as .cache() but the difference is that it >>>>> writes to disk instead of memory? >>>>> >>>>> >>>>> >>>>> Q2: >>>>> >>>>> When is the checkpointed data rewritten in a streaming context? I >>>>> assume it is rewritten otherwise the file where the stream is checkpointed >>>>> to would grow forever. Is it every batch_duration? >>>>> >>>>> ie: >>>>> >>>>> For val streamingContext = new StreamingContext(conf, Seconds(10)) >>>>> the checkpointed data would be rewritten every 10 seconds. >>>>> >>>>> >>>>> >>>>> Thanks >>>>> >>>>> -Adrian >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>> >>> >>> -- >>> Dachuan Huang >>> Cellphone: 614-390-7234 >>> 2015 Neil Avenue >>> Ohio State University >>> Columbus, Ohio >>> U.S.A. >>> 43210 >>> >> >>