Responses inline.

On Tue, Sep 22, 2015 at 8:35 PM, Michal Čizmazia <mici...@gmail.com> wrote:

> Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?
>
> Yes. Because checkpoints are single files by itself, and does not require
flush semantics to work. So S3 is fine.



> Trying to answer this question, I looked into
> Checkpoint.getCheckpointFiles [1]. It is doing findFirstIn which would
> probably be calling the S3 LIST operation. S3 LIST is prone to eventual
> consistency [2]. What would happen when getCheckpointFiles retrieves an
> incomplete list of files to Checkpoint.read [1]?
>
> There is a non-zero chance of that happening. But in that case it will
just use an older checkpoint file to recover the DAG of DStreams. That just
means that it will recover at an earlier point in time, and undergo more
computation.



> The pluggable WAL interface allows me to work around the eventual
> consistency of S3 by storing an index of filenames in DynamoDB. However it
> seems that something similar is required for checkpoints as well.
>

How are you getting around the fact that flush does not work in S3? So
until the current WAL file is closed, the file is not readable, even if you
know the index.
This should not need for checkpoint because of the reason I mentioned above
in this mail.


>
> I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
> there something I can borrow from DirectKafkaInputDStream? After a DStream
> computes an RDD, is there a way for the DStream to tell when processing of
> that RDD has been finished and only after that delete the SQS messages.
>

You could borrow from Direct Kafka! For that to work, you should be able to
do the following.
1. Each message in SQS should have a unique identifier, using which you can
specify ranges of messages to read.

2.  You should be able to query from SQS the identifier of the latest
message, so that you can decide the range to read -- last read message to
latest message

3. There must be a way to find out identifier of the Nth record from the
current record. This is necessary for rate limiting -- if you want to read
at most 1000 message in each interval, and you have read till ID X, then
you should be able to find out (without reading the data), the ID of (X +
1000)th record. This is possible in Kafka, as offsets are continuous, but
not possible in Kinesis as sequence numbers are not continuous numbers.

I am not sure SQS satisfies these properties. If it satisfies 1 and 2, but
not 3, then you can consider looking at the Kinesis Receiver in Spark 1.5,
which still uses receivers, but keeps track of Kinesis sequence numbers in
the metadata WAL.


>
> I was also considering Amazon EFS, but it is only available in a single
> region for a preview. EBS could be an option, but it cannot be used across
> multiple Availability Zones.
>
> [1]:
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
> [2]:
> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
>
>
>
> On 22 September 2015 at 21:09, Tathagata Das <t...@databricks.com> wrote:
>
>> You can keep the checkpoints in the Hadoop-compatible file system and the
>> WAL somewhere else using your custom WAL implementation. Yes, cleaning up
>> the stuff gets complicated as it is not as easy as deleting off the
>> checkpoint directory - you will have to clean up checkpoint directory as
>> well as the whatever other storage that your custom WAL uses. However, if I
>> remember correctly, the WAL information is used only when the Dstreams are
>> recovered correctly from checkpoints.
>>
>> Note that, there are further details here that require deeper
>> understanding. There are actually two uses of WALs in the system -
>>
>> 1. Data WAL for received data  - This is what is usually referred to as
>> the WAL everywhere. Each receiver writes to a different WAL. This deals
>> with bulk data.
>> 2. Metadata WAL - This is used by the driver to save metadata information
>> like  block to data WAL segment mapping, etc. I usually skip mentioning
>> this. This WAL is automatically used when data WAL is enabled. And this
>> deals with small data.
>>
>> If you have to get around S3's limitations, you will have to plugin both
>> WALs (see this
>> <https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala>
>> for SparkConfs, but not that we havent made these confs public). While the
>> system supports plugging them in, we havent made this information public
>> yet because of such complexities in working with it.  And we have invested
>> time in making common sources like Kafka not require WALs (e.g. Direct
>> Kafka  approach). In future, we do hope to have a better solution for
>> general receivers + WALs + S3 (personally, I really wish S3's semantics
>> improve and fixes this issue).
>>
>> Another alternative direction may be Amazon EFS. Since it based on EBS,
>> it may give the necessary semantics. But I havent given that a spin, so its
>> uncharted territory :)
>>
>> TD
>>
>>
>> On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia <mici...@gmail.com>
>> wrote:
>>
>>> My understanding of pluggable WAL was that it eliminates the need for
>>> having a Hadoop-compatible file system [1].
>>>
>>> What is the use of pluggable WAL when it can be only used together with
>>> checkpointing which still requires a Hadoop-compatible file system?
>>>
>>> [1]: https://issues.apache.org/jira/browse/SPARK-7056
>>>
>>>
>>>
>>> On 22 September 2015 at 19:57, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> 1. Currently, the WAL can be used only with checkpointing turned on,
>>>> because it does not make sense to recover from WAL if there is not
>>>> checkpoint information to recover from.
>>>>
>>>> 2. Since the current implementation saves the WAL in the checkpoint
>>>> directory, they share the fate -- if checkpoint directory is deleted, then
>>>> both checkpoint info and WAL info is deleted.
>>>>
>>>> 3. Checkpointing is currently not pluggable. Why do do you want that?
>>>>
>>>>
>>>>
>>>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia <mici...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am trying to use pluggable WAL, but it can be used only with
>>>>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>>>>> system.
>>>>>
>>>>> Is there something like pluggable checkpointing?
>>>>>
>>>>> Or can WAL be used without checkpointing? What happens when WAL is
>>>>> available but the checkpoint directory is lost?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On 18 September 2015 at 05:47, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> I dont think it would work with multipart upload either. The file is
>>>>>> not visible until the multipart download is explicitly closed. So even if
>>>>>> each write a part upload, all the parts are not visible until the 
>>>>>> multiple
>>>>>> download is closed.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran <
>>>>>> ste...@hortonworks.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> > On 17 Sep 2015, at 21:40, Tathagata Das <t...@databricks.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Actually, the current WAL implementation (as of Spark 1.5) does
>>>>>>> not work with S3 because S3 does not support flushing. Basically, the
>>>>>>> current implementation assumes that after write + flush, the data is
>>>>>>> immediately durable, and readable if the system crashes without closing 
>>>>>>> the
>>>>>>> WAL file. This does not work with S3 as data is durable only and only if
>>>>>>> the S3 file output stream is cleanly closed.
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>> more precisely, unless you turn multipartition uploads on, the
>>>>>>> S3n/s3a clients Spark uses *doesn't even upload anything to s3*.
>>>>>>>
>>>>>>> It's not a filesystem, and you have to bear that in mind.
>>>>>>>
>>>>>>> Amazon's own s3 client used in EMR behaves differently; it may be
>>>>>>> usable as a destination (I haven't tested)
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to