In theory it would work, but works very inefficiently on checkpointing. If I understand correctly, it will write the content to the temp file on s3, and rename the file which actually gets the temp file from s3 and write the content of temp file to the final path on s3. Compared to checkpoint with HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom implementation of checkpoint manager on S3.
Also atomic rename is still not working for S3, as well as S3 doesn't support write with overwrite=false. That said, there's no barrier if concurrent streaming queries access to the same checkpoint and mess up. With checkpoint in HDFS, the rename is atomic and only one succeeds even in parallel and the other query lost writing to the checkpoint file simply fails. That's a caveat you may want to keep in mind. On Wed, Dec 2, 2020 at 11:35 PM German Schiavon <gschiavonsp...@gmail.com> wrote: > Hello! > > @Gabor Somogyi <gabor.g.somo...@gmail.com> I wonder that now that s3 is > *strongly > consistent* , would work fine. > > > Regards! > > https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ > > On Thu, 17 Sep 2020 at 11:55, German Schiavon <gschiavonsp...@gmail.com> > wrote: > >> Hi Gabor, >> >> Makes sense, thanks a lot! >> >> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Structured Streaming is simply not working when checkpoint location is >>> on S3 due to it's read-after-write consistency. >>> Please choose an HDFS compliant filesystem and it will work like a charm. >>> >>> BR, >>> G >>> >>> >>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon < >>> gschiavonsp...@gmail.com> wrote: >>> >>>> Hi! >>>> >>>> I have an Structured Streaming Application that reads from kafka, >>>> performs some aggregations and writes in S3 in parquet format. >>>> >>>> Everything seems to work great except that from time to time I get a >>>> checkpoint error, at the beginning I thought it was a random error but it >>>> happened more than 3 times already in a few days >>>> >>>> Caused by: java.io.FileNotFoundException: No such file or directory: >>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>> >>>> >>>> Does this happen to anyone else? >>>> >>>> Thanks in advance. >>>> >>>> *This is the full error :* >>>> >>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >>>> >>>> java.io.FileNotFoundException: No such file or directory: >>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>> >>>> at >>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >>>> >>>> at >>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >>>> >>>> at >>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >>>> >>>> at >>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >>>> >>>> at >>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >>>> >>>> at >>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >>>> >>>> at >>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >>>> >>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >>>> >>>> at >>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >>>> at scala.Option.getOrElse(Option.scala:189) >>>> >>>