To get around the fact that flush does not work in S3, my custom WAL implementation stores a separate S3 object per each WriteAheadLog.write call.
Do you see any gotchas with this approach? On 23 September 2015 at 02:10, Tathagata Das <t...@databricks.com> wrote: > Responses inline. > > > On Tue, Sep 22, 2015 at 8:35 PM, Michal Čizmazia <mici...@gmail.com> > wrote: > >> Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)? >> >> Yes. Because checkpoints are single files by itself, and does not require > flush semantics to work. So S3 is fine. > > > >> Trying to answer this question, I looked into >> Checkpoint.getCheckpointFiles [1]. It is doing findFirstIn which would >> probably be calling the S3 LIST operation. S3 LIST is prone to eventual >> consistency [2]. What would happen when getCheckpointFiles retrieves an >> incomplete list of files to Checkpoint.read [1]? >> >> There is a non-zero chance of that happening. But in that case it will > just use an older checkpoint file to recover the DAG of DStreams. That just > means that it will recover at an earlier point in time, and undergo more > computation. > > > >> The pluggable WAL interface allows me to work around the eventual >> consistency of S3 by storing an index of filenames in DynamoDB. However it >> seems that something similar is required for checkpoints as well. >> > > How are you getting around the fact that flush does not work in S3? So > until the current WAL file is closed, the file is not readable, even if you > know the index. > This should not need for checkpoint because of the reason I mentioned > above in this mail. > > >> >> I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is >> there something I can borrow from DirectKafkaInputDStream? After a DStream >> computes an RDD, is there a way for the DStream to tell when processing of >> that RDD has been finished and only after that delete the SQS messages. >> > > You could borrow from Direct Kafka! For that to work, you should be able > to do the following. > 1. Each message in SQS should have a unique identifier, using which you > can specify ranges of messages to read. > > 2. You should be able to query from SQS the identifier of the latest > message, so that you can decide the range to read -- last read message to > latest message > > 3. There must be a way to find out identifier of the Nth record from the > current record. This is necessary for rate limiting -- if you want to read > at most 1000 message in each interval, and you have read till ID X, then > you should be able to find out (without reading the data), the ID of (X + > 1000)th record. This is possible in Kafka, as offsets are continuous, but > not possible in Kinesis as sequence numbers are not continuous numbers. > > I am not sure SQS satisfies these properties. If it satisfies 1 and 2, but > not 3, then you can consider looking at the Kinesis Receiver in Spark 1.5, > which still uses receivers, but keeps track of Kinesis sequence numbers in > the metadata WAL. > > >> >> I was also considering Amazon EFS, but it is only available in a single >> region for a preview. EBS could be an option, but it cannot be used across >> multiple Availability Zones. >> >> [1]: >> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala >> [2]: >> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel >> >> >> >> On 22 September 2015 at 21:09, Tathagata Das <t...@databricks.com> wrote: >> >>> You can keep the checkpoints in the Hadoop-compatible file system and >>> the WAL somewhere else using your custom WAL implementation. Yes, cleaning >>> up the stuff gets complicated as it is not as easy as deleting off the >>> checkpoint directory - you will have to clean up checkpoint directory as >>> well as the whatever other storage that your custom WAL uses. However, if I >>> remember correctly, the WAL information is used only when the Dstreams are >>> recovered correctly from checkpoints. >>> >>> Note that, there are further details here that require deeper >>> understanding. There are actually two uses of WALs in the system - >>> >>> 1. Data WAL for received data - This is what is usually referred to as >>> the WAL everywhere. Each receiver writes to a different WAL. This deals >>> with bulk data. >>> 2. Metadata WAL - This is used by the driver to save metadata >>> information like block to data WAL segment mapping, etc. I usually skip >>> mentioning this. This WAL is automatically used when data WAL is enabled. >>> And this deals with small data. >>> >>> If you have to get around S3's limitations, you will have to plugin both >>> WALs (see this >>> <https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala> >>> for SparkConfs, but not that we havent made these confs public). While the >>> system supports plugging them in, we havent made this information public >>> yet because of such complexities in working with it. And we have invested >>> time in making common sources like Kafka not require WALs (e.g. Direct >>> Kafka approach). In future, we do hope to have a better solution for >>> general receivers + WALs + S3 (personally, I really wish S3's semantics >>> improve and fixes this issue). >>> >>> Another alternative direction may be Amazon EFS. Since it based on EBS, >>> it may give the necessary semantics. But I havent given that a spin, so its >>> uncharted territory :) >>> >>> TD >>> >>> >>> On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia <mici...@gmail.com> >>> wrote: >>> >>>> My understanding of pluggable WAL was that it eliminates the need for >>>> having a Hadoop-compatible file system [1]. >>>> >>>> What is the use of pluggable WAL when it can be only used together with >>>> checkpointing which still requires a Hadoop-compatible file system? >>>> >>>> [1]: https://issues.apache.org/jira/browse/SPARK-7056 >>>> >>>> >>>> >>>> On 22 September 2015 at 19:57, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> 1. Currently, the WAL can be used only with checkpointing turned on, >>>>> because it does not make sense to recover from WAL if there is not >>>>> checkpoint information to recover from. >>>>> >>>>> 2. Since the current implementation saves the WAL in the checkpoint >>>>> directory, they share the fate -- if checkpoint directory is deleted, then >>>>> both checkpoint info and WAL info is deleted. >>>>> >>>>> 3. Checkpointing is currently not pluggable. Why do do you want that? >>>>> >>>>> >>>>> >>>>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia <mici...@gmail.com> >>>>> wrote: >>>>> >>>>>> I am trying to use pluggable WAL, but it can be used only with >>>>>> checkpointing turned on. Thus I still need have a Hadoop-compatible file >>>>>> system. >>>>>> >>>>>> Is there something like pluggable checkpointing? >>>>>> >>>>>> Or can WAL be used without checkpointing? What happens when WAL is >>>>>> available but the checkpoint directory is lost? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> On 18 September 2015 at 05:47, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> I dont think it would work with multipart upload either. The file is >>>>>>> not visible until the multipart download is explicitly closed. So even >>>>>>> if >>>>>>> each write a part upload, all the parts are not visible until the >>>>>>> multiple >>>>>>> download is closed. >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran < >>>>>>> ste...@hortonworks.com> wrote: >>>>>>> >>>>>>>> >>>>>>>> > On 17 Sep 2015, at 21:40, Tathagata Das <t...@databricks.com> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Actually, the current WAL implementation (as of Spark 1.5) does >>>>>>>> not work with S3 because S3 does not support flushing. Basically, the >>>>>>>> current implementation assumes that after write + flush, the data is >>>>>>>> immediately durable, and readable if the system crashes without >>>>>>>> closing the >>>>>>>> WAL file. This does not work with S3 as data is durable only and only >>>>>>>> if >>>>>>>> the S3 file output stream is cleanly closed. >>>>>>>> > >>>>>>>> >>>>>>>> >>>>>>>> more precisely, unless you turn multipartition uploads on, the >>>>>>>> S3n/s3a clients Spark uses *doesn't even upload anything to s3*. >>>>>>>> >>>>>>>> It's not a filesystem, and you have to bear that in mind. >>>>>>>> >>>>>>>> Amazon's own s3 client used in EMR behaves differently; it may be >>>>>>>> usable as a destination (I haven't tested) >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >