Thanks Jungtaek!

It makes sense, we are currently changing to an HDFS-Compatible FS, I was
wondering how this change would impact the checkpoint, but after what you
said it is more clear now.



On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> In theory it would work, but works very inefficiently on checkpointing. If
> I understand correctly, it will write the content to the temp file on s3,
> and rename the file which actually gets the temp file from s3 and write the
> content of temp file to the final path on s3. Compared to checkpoint with
> HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
> implementation of checkpoint manager on S3.
>
> Also atomic rename is still not working for S3, as well as S3 doesn't
> support write with overwrite=false. That said, there's no barrier if
> concurrent streaming queries access to the same checkpoint and mess up.
> With checkpoint in HDFS, the rename is atomic and only one succeeds even in
> parallel and the other query lost writing to the checkpoint file simply
> fails. That's a caveat you may want to keep in mind.
>
> On Wed, Dec 2, 2020 at 11:35 PM German Schiavon <gschiavonsp...@gmail.com>
> wrote:
>
>> Hello!
>>
>> @Gabor Somogyi <gabor.g.somo...@gmail.com>  I wonder that now that s3 is 
>> *strongly
>> consistent* , would work fine.
>>
>>
>> Regards!
>>
>> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>>
>> On Thu, 17 Sep 2020 at 11:55, German Schiavon <gschiavonsp...@gmail.com>
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> Makes sense, thanks a lot!
>>>
>>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Structured Streaming is simply not working when checkpoint location is
>>>> on S3 due to it's read-after-write consistency.
>>>> Please choose an HDFS compliant filesystem and it will work like a
>>>> charm.
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
>>>> gschiavonsp...@gmail.com> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> I have an Structured Streaming Application that reads from kafka,
>>>>> performs some aggregations and writes in S3 in parquet format.
>>>>>
>>>>> Everything seems to work great except that from time to time I get a
>>>>> checkpoint error, at the beginning I thought it was a random error but it
>>>>> happened more than 3 times already in a few days
>>>>>
>>>>> Caused by: java.io.FileNotFoundException: No such file or directory:
>>>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>>
>>>>>
>>>>> Does this happen to anyone else?
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> *This is the full error :*
>>>>>
>>>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>>>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>>>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>>>>
>>>>> java.io.FileNotFoundException: No such file or directory:
>>>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>>>>
>>>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>>>>
>>>>> at
>>>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>>>> at scala.Option.getOrElse(Option.scala:189)
>>>>>
>>>>

Reply via email to