Hi Pawel, You are correct that the write method invocation is guaranteed to be thread safe for the same sub operator instance. But I am not sure if having a unique counter per subtask across buckets would add much to the user experience of the sink. I think that in both cases, the interpretation of the part files would be the same.
I may be wrong though so please let me know if this is a deal breaker for you. Cheers, Kostas On Sat, Jan 25, 2020 at 11:48 AM Pawel Bartoszek <pawelbartosze...@gmail.com> wrote: > > Hi Kostas, > > Thanks for confirming that. I started thinking it might be useful or more > user friendly to use unique counter across buckets for the same operator > subtask? > The way I could imagine this working is to pass max counter to the > https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204 > write method? or bucket holding instance of Buckets class and accessing > global counter from there? As far as I know the write method invocation is > guaranteed to be thread safe for the same sub operator instance. > > Thanks, > Pawel > > > On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <kklou...@gmail.com> wrote: >> >> Hi Pawel, >> >> You are correct that counters are unique within the same bucket but >> NOT across buckets. Across buckets, you may see the same counter being >> used. >> The max counter is used only upon restoring from a failure, resuming >> from a savepoint or rescaling and this is done to guarantee that n >> valid data are overwritten while limiting the state that Flink has to >> keep internally. For a more detailed discussion about the why, you can >> have a look here: https://issues.apache.org/jira/browse/FLINK-13609 >> >> Cheers, >> Kostas >> >> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek >> <pawelbartosze...@gmail.com> wrote: >> > >> > I have looked into the source code and it looks likes that the same >> > counter counter value being used in two buckets is correct. >> > Each Bucket class >> > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java >> > is passed partCounter in the constructor. Whenever part file is rolled >> > over then counter is incremented within the scope of this bucket. It can >> > happen that there are two or more active buckets and counter is increased >> > independently inside them so that they are become equal. However, global >> > max counter maintained by Buckets class always keeps the max part counter >> > so that when new bucket is created is passed the correct part counter. >> > >> > I have done my analysis based on the logs from my job. I highlighted the >> > same counter value used for part-0-8. >> > >> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z. >> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 received completion notification for checkpoint with id=7. >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7). >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on >> > checkpoint. >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and >> > bucketPath=s3://xxx >> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to >> > element >> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z. >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 received completion notification for checkpoint with id=8. >> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to >> > element >> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z. >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9). >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on >> > checkpoint. >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and >> > bucketPath=s3://xxx >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on >> > checkpoint. >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and >> > bucketPath=s3://xxx >> > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to >> > element >> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z. >> > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 received completion notification for checkpoint with id=9. >> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to >> > element >> > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z. >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing for checkpoint with id=10 (max part counter=10). >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on >> > checkpoint. >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and >> > bucketPath=s3://xxx >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask >> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on >> > checkpoint. >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - >> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and >> > bucketPath=s3://xxx >> > >> > >> > Thanks, >> > Pawel >> > >> > >> > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pawelbartosze...@gmail.com> >> > wrote: >> >> >> >> Hi, >> >> >> >> >> >> Flink Streaming Sink is designed to use global counter when creating >> >> files to avoid overwrites. I am running Flink 1.8.2 with Kinesis >> >> Analytics (managed flink provided by AWS) with bulk writes (rolling >> >> policy is hardcoded to roll over on checkpoint). >> >> My job is configured to checkpoint every minute. Job is running with >> >> parallelism 1. >> >> >> >> The problem is that the same counter 616 is used for both files >> >> invalid-records/2020-01-22T15_06_00Z/part-0-616 and >> >> invalid-records/2020-01-22T15_05_00Z/part-0-616. >> >> >> >> 15:06:37 >> >> { "locationInformation": >> >> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", >> >> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": >> >> "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID >> >> f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", >> >> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent >> >> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", >> >> "applicationARN": >> >> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", >> >> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": >> >> "INFO"} >> >> } >> >> 15:07:37 >> >> { "locationInformation": >> >> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", >> >> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": >> >> "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID >> >> XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", >> >> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent >> >> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", >> >> "applicationARN": >> >> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", >> >> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": >> >> "INFO" } >> >> >> >> Thanks, >> >> Pawel