Flink - problem with removing checkpoint _metadata from google cloud storage when encrypting

2023-10-03 Thread Simon

Hi,

I deploy flink job in Google Kubernetes Engine(GKE) with enabled 
checkpointing on google cloud storage (GS). I configure GS filesystem 
with configuration described here 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/gcs/


I also want to encrypt data stored in GS which i do with configuration 
described here 
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.4/gcs/CONFIGURATION.md#encryption-csek


using this variables in flink config

 * fs.gs.encryption.algorithm
 * fs.gs.encryption.key (not set by default)
 * fs.gs.encryption.key.hash (not set by default)

When I start flink job, it works and checkpoints are done. But when 
checkpoints are discarded, in log i see errors like this "/_metadata>" and from google API error 400 and message "The target 
object is not encrypted by a customer-supplied encryption key".


I checked that _metadata files are not encrypted. I don't understand 
this behavior because I set encryption in flink config and the _metadata 
are done by flink. Since Flink created them, it should be able to delete 
them as well.


It turns out that flink creates a _metadata file for checkpoint as not 
encrypted but tries to delete them as encrypted files, which causes an 
error in the google API.


Currently, I could not correct this behavior. The checkpoints themselves 
work fine and everything is removed from gs if I do not run encryption. 
Maybe someone knows what to do when running encryption on GS but 
checkpoint data were completely deleted.



Best regards

Szymon


Re: Flink problem

2021-02-19 Thread Guanghui Zhang
Can you tell what to do when the record is reported again by userId:001
within 10 minutes, for example buffer it or keep the only one ?

ゞ野蠻遊戲χ  于2021年2月19日周五 下午7:35写道:

> hi all
>
>  For example, if A user message A (uerId: 001) is reported, and no
> record is reported again by userId: 001 within 10 minutes, record A will be
> sent out. How can this be achieved in Flink?
>
> Thanks
> Jiazhi
>
> --
> 发自我的iPhone
>


Re: Flink problem

2021-02-19 Thread Xintong Song
What you're looking for might be Session Window[1].

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows

On Fri, Feb 19, 2021 at 7:35 PM ゞ野蠻遊戲χ  wrote:

> hi all
>
>  For example, if A user message A (uerId: 001) is reported, and no
> record is reported again by userId: 001 within 10 minutes, record A will be
> sent out. How can this be achieved in Flink?
>
> Thanks
> Jiazhi
>
> --
> 发自我的iPhone
>


Flink problem

2021-02-19 Thread ゞ野蠻遊戲χ
hi all


  For example, if A user message A (uerId: 001) is reported, 
and no record is reported again by userId: 001 within 10 minutes, record A will 
be sent out. How can this be achieved in Flink?


Thanks
Jiazhi



发自我的iPhone