Here is a scenario. Lets assume the stream that needs to be processed is 1
2 3 4 5 and a processing stage should generate a cumulative sum 1 3 6 10 15
to the next stage.

----> 1 2 3 4 5 ---> { stage 1 } -----> 1 3 6 10 15 ----> {stage 2}

Each stage has two replicas, one active and one backup. The output of the
active replica is sent to stage 2 and the output of backup replica is
ignore. Replica 1 (active)  and replica 2 (backup) are not processing the
stream in sync. Replica 1 processed 1 2 3, and generated 1 3 and 6 which
have been sent to stage 2. Replica 2 has processed 1 2 3 4 and generated 1
3 6 10, but its output has been ignored because it is the backup. Replica 1
crashes, and replica 2 immediately become active and its output starts
getting pushed to stage2. Replica 2 consumes input 5 next and generates 15.
 10 was never pushed to stage 2, because replica 1 crashed before
generating it and replica 2 became active after generating 10 (and being
ignored).

This is a very simple workload. Flux introduced its synch protocol to make
sure that Replica 2 starts generating results to stage 2 exactly at the
same point where replica 1 left off.

TD



On Wed, Feb 19, 2014 at 5:01 AM, dachuan <hdc1...@gmail.com> wrote:

> Thanks for the clarification. Those are by default enabled, I noticed in
> the source code of statedstream and the inverted reducedwindow stream.
>
> I have one further question about the high level idea. Sosp 2013 streaming
> paper had attacked existing work Flux for they have to use high overhead
> synchronization protocol between replications. If possible, could you
> please give me one concrete workload that really needs synchronizing
> between replications?
>
> I have browesed through the flux paper and didn't find that concrete
> workload.
>
> Thanks
> Dachuan.
> On Feb 19, 2014 1:24 AM, "Tathagata Das" <tathagata.das1...@gmail.com>
> wrote:
>
>> Apologies if that was confusing. I meant that in streaming application
>> where a few specific DStream operations like updateStateByKey, etc. are
>> used, you have to enable checkpointing. In fact, the system automatically
>> enables checkpointing with some default interval for DStreams that are
>> generated by those operations that need checkpointing.
>>
>> TD
>>
>>
>> On Tue, Feb 18, 2014 at 2:57 PM, dachuan <hdc1...@gmail.com> wrote:
>>
>>> I'm sorry but what does "its not fundamentally not possible to avoid
>>> checkpointing" mean?
>>>
>>> Are you saying that for these two stateful streaming app, it's possible
>>> to avoid checkpointing? how?
>>>
>>>
>>> On Tue, Feb 18, 2014 at 5:44 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> A3: The basic RDD model is that the dataset is immutable. As new
>>>> batches of data come in, each batch is treat as a RDD. Then RDD
>>>> transformations are applied to create new RDDs. When some of these RDDs are
>>>> checkpointed, then create separate HDFS files. So, yes, the checkpoint
>>>> files will keep accumulating unless deleted. And they should be only
>>>> deleted when we know that the RDD that represents the checkpoint file is
>>>> not in scope and not going to be used any more. Specifically for Spark
>>>> Streaming programs, this deletion of checkpoint files is take care off
>>>> automatically as Spark Streaming tracks which RDDs are not going to be used
>>>> any more.
>>>>
>>>> A4: If there is a NFS-like file system that is accessible through the
>>>> same mount point at all the nodes in the cluster running Spark, then you
>>>> can set a local path to that file system as the checkpoint directory. Then
>>>> the checkpoint files will be written to the NFS file system. Note that it
>>>> assumes that the NFS-like filesystem is fault-tolerant and the checkpoitn
>>>> file will be always accessible (in case Spark has to recover RDDs from the
>>>> checkpointed file).
>>>>
>>>> Spark's RDDs have the power to recompute based on the lineage
>>>> information to ancestor RDDs. However, that lineage must be finitely long
>>>> (otherwise, it will have infinite recomputation) and must end at some
>>>> dataset that is inherently fault-tolerant (like HDFS file). In streaming
>>>> however, the lineage can grow infinitely (depends on the computation), and
>>>> is not backed by a HDFS files. That's why we HAVE TO
>>>> (i) Replicate raw input data to make the input dataset fault-tolerant.
>>>> (ii) Periodically checkpoint to limit the length of the lineage and
>>>> there limit the amount of recomputation necessary under faults.
>>>> So for certain kinds of streaming computations (e.g. updateStateByKey,
>>>> reduceByKeyAndWindow with inverse function, etc.) its not fundamentally not
>>>> possible to avoid checkpointing.
>>>>
>>>> Hope this helps!
>>>>
>>>> TD
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 18, 2014 at 10:56 AM, Adrian Mocanu <
>>>> amoc...@verticalscope.com> wrote:
>>>>
>>>>>  Hi Tathagata!
>>>>>
>>>>> Thanks for getting back to me.
>>>>>
>>>>>
>>>>>
>>>>> I've 2 follow up questions.
>>>>>
>>>>>
>>>>>
>>>>> Q3: You've explained how data is checkpointed, but never addressed the
>>>>> part about new batches overwriting old batches. Is it true that data is
>>>>> overwritten or is it that all data is saved resulting in a huge blob.
>>>>>
>>>>>
>>>>>
>>>>> Q4:
>>>>>
>>>>> I don't use hdfs, but a unix files system. Will my stream not be fault
>>>>> tolerant in this case bc there is no replication? Won't, in this case,
>>>>> Spark use the power of its RDDs?
>>>>>
>>>>>
>>>>>
>>>>> Thanks again
>>>>>
>>>>> A
>>>>>
>>>>>
>>>>>
>>>>> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
>>>>> *Sent:* February-14-14 8:15 PM
>>>>> *To:* user@spark.incubator.apache.org
>>>>> *Subject:* Re: checkpoint and not running out of disk space
>>>>>
>>>>>
>>>>>
>>>>> Hello Adrian,
>>>>>
>>>>>
>>>>>
>>>>> A1: There is a significant difference between cache and checkpoint.
>>>>> Cache materializes the RDD and keeps it in memory and / or disk. But the
>>>>> lineage of RDD (that is, seq of operations that generated the RDD) will be
>>>>> remembered, so that if there are node failures and parts of the cached 
>>>>> RDDs
>>>>> are lost, they an be regenerated. However, checkpoint saves the RDD to an
>>>>> HDFS file AND actually FORGETS the lineage completely. This is allows long
>>>>> lineages to be truncated and the data to be saved reliably in HDFS (which
>>>>> is naturally fault tolerant by replication).
>>>>>
>>>>>
>>>>>
>>>>> A2: The answer to this is a little involved. There are actually two
>>>>> kinds of checkpoitning going on -
>>>>>
>>>>> (i) data checkpointing (i.e. RDD checkpoints) and
>>>>>
>>>>> (ii) metadata checkpointing, which stores the RDD checkpoint file
>>>>> names associated with DStreams to a file in HDFS, and other stuff
>>>>>
>>>>>
>>>>>
>>>>> The metadata checkpoint is done every batch.
>>>>>
>>>>> The data checkpointing, that is, the  frequency at which RDDs of a
>>>>> DStream is checkpointed is configurable with
>>>>> DStream.checkpoint(...duration...). Duration must be a multiple of the
>>>>> batch interval / slide interval for windowed streams. However, by default,
>>>>> the interval of checkpointing is MAX(batch interval, 10 seconds). So if
>>>>> batch interval is more than 10 seconds, then every batch; if its less than
>>>>> 10 seconds, then every 10 seconds (multiple of batch interval that is more
>>>>> than 10 seconds).
>>>>>
>>>>>
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 14, 2014 at 8:47 AM, Adrian Mocanu <
>>>>> amoc...@verticalscope.com> wrote:
>>>>>
>>>>>  Hi
>>>>>
>>>>>
>>>>>
>>>>> Q1:
>>>>>
>>>>> Is .checkpoint() the same as .cache() but the difference is that it
>>>>> writes to disk instead of memory?
>>>>>
>>>>>
>>>>>
>>>>> Q2:
>>>>>
>>>>> When is the checkpointed data rewritten in a streaming context? I
>>>>> assume it is rewritten otherwise the file where the stream is checkpointed
>>>>> to would grow forever. Is it every batch_duration?
>>>>>
>>>>> ie:
>>>>>
>>>>> For  val streamingContext = new StreamingContext(conf, Seconds(10))
>>>>> the checkpointed data would be rewritten every 10 seconds.
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> -Adrian
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Dachuan Huang
>>> Cellphone: 614-390-7234
>>> 2015 Neil Avenue
>>> Ohio State University
>>> Columbus, Ohio
>>> U.S.A.
>>> 43210
>>>
>>
>>

Reply via email to