To get around the fact that flush does not work in S3, my custom WAL
implementation stores a separate S3 object per each WriteAheadLog.write
call.

Do you see any gotchas with this approach?



On 23 September 2015 at 02:10, Tathagata Das <t...@databricks.com> wrote:

> Responses inline.
>
>
> On Tue, Sep 22, 2015 at 8:35 PM, Michal Čizmazia <mici...@gmail.com>
> wrote:
>
>> Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?
>>
>> Yes. Because checkpoints are single files by itself, and does not require
> flush semantics to work. So S3 is fine.
>
>
>
>> Trying to answer this question, I looked into
>> Checkpoint.getCheckpointFiles [1]. It is doing findFirstIn which would
>> probably be calling the S3 LIST operation. S3 LIST is prone to eventual
>> consistency [2]. What would happen when getCheckpointFiles retrieves an
>> incomplete list of files to Checkpoint.read [1]?
>>
>> There is a non-zero chance of that happening. But in that case it will
> just use an older checkpoint file to recover the DAG of DStreams. That just
> means that it will recover at an earlier point in time, and undergo more
> computation.
>
>
>
>> The pluggable WAL interface allows me to work around the eventual
>> consistency of S3 by storing an index of filenames in DynamoDB. However it
>> seems that something similar is required for checkpoints as well.
>>
>
> How are you getting around the fact that flush does not work in S3? So
> until the current WAL file is closed, the file is not readable, even if you
> know the index.
> This should not need for checkpoint because of the reason I mentioned
> above in this mail.
>
>
>>
>> I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
>> there something I can borrow from DirectKafkaInputDStream? After a DStream
>> computes an RDD, is there a way for the DStream to tell when processing of
>> that RDD has been finished and only after that delete the SQS messages.
>>
>
> You could borrow from Direct Kafka! For that to work, you should be able
> to do the following.
> 1. Each message in SQS should have a unique identifier, using which you
> can specify ranges of messages to read.
>
> 2.  You should be able to query from SQS the identifier of the latest
> message, so that you can decide the range to read -- last read message to
> latest message
>
> 3. There must be a way to find out identifier of the Nth record from the
> current record. This is necessary for rate limiting -- if you want to read
> at most 1000 message in each interval, and you have read till ID X, then
> you should be able to find out (without reading the data), the ID of (X +
> 1000)th record. This is possible in Kafka, as offsets are continuous, but
> not possible in Kinesis as sequence numbers are not continuous numbers.
>
> I am not sure SQS satisfies these properties. If it satisfies 1 and 2, but
> not 3, then you can consider looking at the Kinesis Receiver in Spark 1.5,
> which still uses receivers, but keeps track of Kinesis sequence numbers in
> the metadata WAL.
>
>
>>
>> I was also considering Amazon EFS, but it is only available in a single
>> region for a preview. EBS could be an option, but it cannot be used across
>> multiple Availability Zones.
>>
>> [1]:
>> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
>> [2]:
>> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
>>
>>
>>
>> On 22 September 2015 at 21:09, Tathagata Das <t...@databricks.com> wrote:
>>
>>> You can keep the checkpoints in the Hadoop-compatible file system and
>>> the WAL somewhere else using your custom WAL implementation. Yes, cleaning
>>> up the stuff gets complicated as it is not as easy as deleting off the
>>> checkpoint directory - you will have to clean up checkpoint directory as
>>> well as the whatever other storage that your custom WAL uses. However, if I
>>> remember correctly, the WAL information is used only when the Dstreams are
>>> recovered correctly from checkpoints.
>>>
>>> Note that, there are further details here that require deeper
>>> understanding. There are actually two uses of WALs in the system -
>>>
>>> 1. Data WAL for received data  - This is what is usually referred to as
>>> the WAL everywhere. Each receiver writes to a different WAL. This deals
>>> with bulk data.
>>> 2. Metadata WAL - This is used by the driver to save metadata
>>> information like  block to data WAL segment mapping, etc. I usually skip
>>> mentioning this. This WAL is automatically used when data WAL is enabled.
>>> And this deals with small data.
>>>
>>> If you have to get around S3's limitations, you will have to plugin both
>>> WALs (see this
>>> <https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala>
>>> for SparkConfs, but not that we havent made these confs public). While the
>>> system supports plugging them in, we havent made this information public
>>> yet because of such complexities in working with it.  And we have invested
>>> time in making common sources like Kafka not require WALs (e.g. Direct
>>> Kafka  approach). In future, we do hope to have a better solution for
>>> general receivers + WALs + S3 (personally, I really wish S3's semantics
>>> improve and fixes this issue).
>>>
>>> Another alternative direction may be Amazon EFS. Since it based on EBS,
>>> it may give the necessary semantics. But I havent given that a spin, so its
>>> uncharted territory :)
>>>
>>> TD
>>>
>>>
>>> On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia <mici...@gmail.com>
>>> wrote:
>>>
>>>> My understanding of pluggable WAL was that it eliminates the need for
>>>> having a Hadoop-compatible file system [1].
>>>>
>>>> What is the use of pluggable WAL when it can be only used together with
>>>> checkpointing which still requires a Hadoop-compatible file system?
>>>>
>>>> [1]: https://issues.apache.org/jira/browse/SPARK-7056
>>>>
>>>>
>>>>
>>>> On 22 September 2015 at 19:57, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> 1. Currently, the WAL can be used only with checkpointing turned on,
>>>>> because it does not make sense to recover from WAL if there is not
>>>>> checkpoint information to recover from.
>>>>>
>>>>> 2. Since the current implementation saves the WAL in the checkpoint
>>>>> directory, they share the fate -- if checkpoint directory is deleted, then
>>>>> both checkpoint info and WAL info is deleted.
>>>>>
>>>>> 3. Checkpointing is currently not pluggable. Why do do you want that?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia <mici...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I am trying to use pluggable WAL, but it can be used only with
>>>>>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>>>>>> system.
>>>>>>
>>>>>> Is there something like pluggable checkpointing?
>>>>>>
>>>>>> Or can WAL be used without checkpointing? What happens when WAL is
>>>>>> available but the checkpoint directory is lost?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>> On 18 September 2015 at 05:47, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I dont think it would work with multipart upload either. The file is
>>>>>>> not visible until the multipart download is explicitly closed. So even 
>>>>>>> if
>>>>>>> each write a part upload, all the parts are not visible until the 
>>>>>>> multiple
>>>>>>> download is closed.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran <
>>>>>>> ste...@hortonworks.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> > On 17 Sep 2015, at 21:40, Tathagata Das <t...@databricks.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Actually, the current WAL implementation (as of Spark 1.5) does
>>>>>>>> not work with S3 because S3 does not support flushing. Basically, the
>>>>>>>> current implementation assumes that after write + flush, the data is
>>>>>>>> immediately durable, and readable if the system crashes without 
>>>>>>>> closing the
>>>>>>>> WAL file. This does not work with S3 as data is durable only and only 
>>>>>>>> if
>>>>>>>> the S3 file output stream is cleanly closed.
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>> more precisely, unless you turn multipartition uploads on, the
>>>>>>>> S3n/s3a clients Spark uses *doesn't even upload anything to s3*.
>>>>>>>>
>>>>>>>> It's not a filesystem, and you have to bear that in mind.
>>>>>>>>
>>>>>>>> Amazon's own s3 client used in EMR behaves differently; it may be
>>>>>>>> usable as a destination (I haven't tested)
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to