Re: Message guarantees with S3 Sink
Thanks Gary! Sure, there are issues with updates in S3. You may want to look over EMRFS guarantees of the consistent view [1]. I'm not sure, is it possible in non-EMR AWS system or not. I'm creating a JIRA issue regarding data loss possibility in S3. IMHO, Flink docs should mention about possible data loss in S3. [1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html -- Thanks, Amit On Fri, May 18, 2018 at 2:48 AM, Gary Yao wrote: > Hi Amit, > > The BucketingSink doesn't have well defined semantics when used with S3. > Data > loss is possible but I am not sure whether it is the only problem. There are > plans to rewrite the BucketingSink in Flink 1.6 to enable eventually > consistent > file systems [1][2]. > > Best, > Gary > > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html > [2] https://issues.apache.org/jira/browse/FLINK-6306 > > On Thu, May 17, 2018 at 11:57 AM, Amit Jain wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > >
Re: Message guarantees with S3 Sink
Hi Amit, The BucketingSink doesn't have well defined semantics when used with S3. Data loss is possible but I am not sure whether it is the only problem. There are plans to rewrite the BucketingSink in Flink 1.6 to enable eventually consistent file systems [1][2]. Best, Gary [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html [2] https://issues.apache.org/jira/browse/FLINK-6306 On Thu, May 17, 2018 at 11:57 AM, Amit Jain wrote: > Hi, > > We are using Flink to process click stream data from Kafka and pushing > the same in 128MB file in S3. > > What is the message processing guarantees with S3 sink? In my > understanding, S3A client buffers the data on memory/disk. In failure > scenario on particular node, TM would not trigger Writer#close hence > buffered data can lose entirely assuming this buffer contains data of > last successful checkpointing. > > -- > Thanks, > Amit >
Re: Message guarantees with S3 Sink
Hi Rong, We are using BucketingSink only. I'm looking for the case where TM does not get the chance to call Writer#flush like YARN killed the TM because of OOM. We have configured fs.s3.impl to com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so BucketingSink is using S3 client internally. When we write data using S3A client, it buffers up the data in memory or disk until it hit multipart file size or call to close of OutputStream happens. Now suppose, S3A client buffers up 40MB data in TM's local disk and same time checkpoint barrier comes in at Sink and got successfully completed. Write process in sink resumes and now buffer data size reaches to 60MB and now YARN killed the TM. What would happen to original 40MB of data ? -- Thanks, Amit On Thu, May 17, 2018 at 10:28 PM, Rong Rong wrote: > Hi Amit, > > Can you elaborate how you write using "S3 sink" and which version of Flink > you are using? > > If you are using BucketingSink[1], you can checkout the API doc and > configure to flush before closing your sink. > This way your sink is "integrated with the checkpointing mechanism to > provide exactly once semantics"[2] > > Thanks, > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html > > On Thu, May 17, 2018 at 2:57 AM, Amit Jain wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > >
Re: Message guarantees with S3 Sink
Hi Amit, Can you elaborate how you write using "S3 sink" and which version of Flink you are using? If you are using BucketingSink[1], you can checkout the API doc and configure to flush before closing your sink. This way your sink is "integrated with the checkpointing mechanism to provide exactly once semantics"[2] Thanks, Rong [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html [2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html On Thu, May 17, 2018 at 2:57 AM, Amit Jain wrote: > Hi, > > We are using Flink to process click stream data from Kafka and pushing > the same in 128MB file in S3. > > What is the message processing guarantees with S3 sink? In my > understanding, S3A client buffers the data on memory/disk. In failure > scenario on particular node, TM would not trigger Writer#close hence > buffered data can lose entirely assuming this buffer contains data of > last successful checkpointing. > > -- > Thanks, > Amit >
Message guarantees with S3 Sink
Hi, We are using Flink to process click stream data from Kafka and pushing the same in 128MB file in S3. What is the message processing guarantees with S3 sink? In my understanding, S3A client buffers the data on memory/disk. In failure scenario on particular node, TM would not trigger Writer#close hence buffered data can lose entirely assuming this buffer contains data of last successful checkpointing. -- Thanks, Amit