[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674228#comment-16674228 ] ASF GitHub Bot commented on NIFI-4715: -- Github user jvwing commented on the issue: https://github.com/apache/nifi/pull/3116 Thanks, @ijokarumawak and @adamlamar! The combined change looks good to me. I ran it through multiple test loops putting and listing about 10,000 objects in S3. No objects were missed. Duplicates were very low (< 100 per 10,000), coinciding with S3 500 errors "We encountered an internal error. Please try again." I believe we are handling this well to allow a few duplicates for at-least-once processing. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674227#comment-16674227 ] ASF GitHub Bot commented on NIFI-4715: -- Github user jvwing commented on the issue: https://github.com/apache/nifi/pull/2361 Thanks, @adamlamar! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674222#comment-16674222 ] ASF GitHub Bot commented on NIFI-4715: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/3116 > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674221#comment-16674221 ] ASF GitHub Bot commented on NIFI-4715: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2361 > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674220#comment-16674220 ] ASF subversion and git services commented on NIFI-4715: --- Commit 37a0e1b3048b5db067b6485bb437887cb0869888 in nifi's branch refs/heads/master from [~ijokarumawak] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=37a0e1b ] NIFI-4715: Update currentKeys after listing loop ListS3 used to update currentKeys within listing loop, that causes duplicates. Because S3 returns object list in lexicographic order, if we clear currentKeys during the loop, we cannot tell if the object has been listed or not, in a case where newer object has a lexicographically former name. Signed-off-by: James Wing This closes #3116, closes #2361. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674219#comment-16674219 ] ASF subversion and git services commented on NIFI-4715: --- Commit 0a014dcdb13e30084e6378c14f8c8e5568493c33 in nifi's branch refs/heads/master from [~adamonduty] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=0a014dc ] NIFI-4715: ListS3 produces duplicates in frequently updated buckets Keep totalListCount, reduce unnecessary persistState This closes #2361. Signed-off-by: Koji Kawamura > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671058#comment-16671058 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on the issue: https://github.com/apache/nifi/pull/2361 Hi @ijokarumawak, I'm sorry I wasn't able to take this across the finish line. Thanks a bunch for continuing the effort! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671013#comment-16671013 ] ASF GitHub Bot commented on NIFI-4715: -- Github user jvwing commented on the issue: https://github.com/apache/nifi/pull/3116 Reviewing... > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669701#comment-16669701 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2361 Hi @adamlamar , I hope this message finds you well. Since some other users asked about this issue, I went ahead and took over the remaining concerns around updating `currentKeys` during list loop. And submitted another PR #3116 based on your commits. When it gets merged, this PR will be closed automatically. If you have any comments, please keep discussing on the new PR. Thanks again for your contribution! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669697#comment-16669697 ] ASF GitHub Bot commented on NIFI-4715: -- GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/3116 NIFI-4715: ListS3 produces duplicates in frequently updated buckets This PR is based on #2361. To preserve @adamlamar's credit, please do not squash the first commit when merging. Thanks! The 2nd commit avoids updating `currentKeys` during the listing loop. Before this fix, it's easy to reproduce duplicated list with a small number of objects. E.g 10 objects to S3 uploaded at the same time, ListS3 can produce 27 FlowFiles. Using min age doesn't address the issue. Please use [the template file attached to the JIRA](https://issues.apache.org/jira/secure/attachment/12946341/ListS3_Duplication.xml) to reproduce. After applying this fix, I confirmed ListS3 can produce FlowFiles without duplication. I tested 10,000 objects were listed without duplication while those were uploaded by PutS3 and listed by ListS3 simultaneously. --- Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-4715 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3116.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3116 commit 3853b13806121c1479edac2038634992ffc6fdfe Author: Adam Lamar Date: 2017-12-24T03:29:02Z NIFI-4715: ListS3 produces duplicates in frequently updated buckets Keep totalListCount, reduce unnecessary persistState This closes #2361. Signed-off-by: Koji Kawamura commit 4d445055cf605811f85bfed12b33155adbd570a2 Author: Koji Kawamura Date: 2018-10-31T07:01:36Z NIFI-4715: Update currentKeys after listing loop ListS3 used to update currentKeys within listing loop, that causes duplicates. Because S3 returns object list in lexicographic order, if we clear currentKeys during the loop, we cannot tell if the object has been listed or not, in a case where newer object has a lexicographically former name. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Assignee: Koji Kawamura >Priority: Major > Attachments: List-S3-dup-issue.xml, ListS3_Duplication.xml, > screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361862#comment-16361862 ] James Wing commented on NIFI-4715: -- [~aburkard] I think your analysis is correct, eventual consistency will still be a problem. We still want the fixes in this ticket to improve ListS3's response handling. To hopefully address the eventual consistency handling, I created NIFI-4876 to add a minimum age filter to ListS3. Ignoring files until they are 30 seconds or 1 minute old might go a long way to reducing eventual consistency problems. That said, I think we also need to manage our expectations. A "fixed" ListS3 will still be limited by the nature of S3, it won't work exactly like GetFile. Users can also subscribe to S3 bucket events for a contrasting experience focusing on the latest changes over the history and replay that ListS3 provides. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das >Priority: Major > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321660#comment-16321660 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2361 Thank you, @adamlamar I understand that. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321243#comment-16321243 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on the issue: https://github.com/apache/nifi/pull/2361 @ijokarumawak I've made some progress, but unfortunately just trying to find time! No tech questions at this point, but thanks for checking in. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317720#comment-16317720 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2361 @adamlamar How is it going? Looking forward to review the updated PR. Just wanted to check if you have any issues. Thanks! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16310705#comment-16310705 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on the issue: https://github.com/apache/nifi/pull/2361 @ijokarumawak Roger, I will expand the scope of this JIRA to include those other fixes. Should be doable in a single JIRA, was just unsure how you all prefer to move forward. Thanks again for all your help - will likely be several days before I'm able to push new commits. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} > Update: 01/03/2018 > There is one more flavor of same defect. > Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = > 1514987311000 on state. > 1. File will be picked up time current state will be updated to > currentTimestamp=1514987311000 (but OS System time is 1514987611000) > 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be > cleared because lastModified > maxTimeStamp > (=currentTimestamp=1514987311000). > CurrentTimeStamp will saved as 1514987611000 > 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at > 1514987611000" will be picked up again because file1 is no longer in the keys. > I think solution is currentTimeStamp need to persisted current system time > stamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16310030#comment-16310030 ] Milan Das commented on NIFI-4715: - There is one more flavor of same defect. Suppose: file1 is modified at 1514987611000 on S3 and currentTimestamp = 1514987311000 on state. 1. File will be picked up time current state will be updated to currentTimestamp=1514987311000 (but OS System time is 1514987611000) 2. next cycle for file2 with lastmodified: 1514987611000 : keys will be cleared because lastModified > maxTimeStamp (=currentTimestamp=1514987311000). CurrentTimeStamp will saved as 1514987611000 3. next cycle: currentTimestamp=1514987611000 , "file1 modified at 1514987611000" will be picked up again because file1 is no longer in the keys. I think solution is currentTimeStamp need to persisted current system time stamp. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309489#comment-16309489 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2361 @adamlamar How long do you think you need to address multiple bugs you are aware of? If those can be addressed in the same ListS3 processor, then I'd prefer to have all (as much as we can) in this JIRA/PR, as we can reduce testing effort and overall review cycle. If it's too complicated to be done at once, then please submit different JIRAs to beak those into smaller pieces. Thank you! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309192#comment-16309192 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on the issue: https://github.com/apache/nifi/pull/2361 > Do we risk making duplication by updating currentKeys in the middle of the loop? Yes, I think we do! I identified a similar (possibly the same) bug, and I agree with all of your suggestions. The question in my mind is whether we should fix all of these issues in this JIRA or defer to another. As far as the original JIRA goes, I believe the current commit will address the issue. I also did a fair bit of manual testing so I would be comfortable moving forward with this change as-is. Before refactoring, I'd like to put some additional unit tests in place for safety. Its clear from the discussion that there is some meat here and I'd really like to enumerate a few cases I've seen while testing in unit tests. So its up to you - would you prefer that I start the unit tests and address (potentially) multiple bugs in this PR, or should we merge this and create another JIRA? > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307146#comment-16307146 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2361 @adamlamar I tried to find the API documentation but couldn't find the exact statement. Thanks, now it does makes sense to defer updating `currentTimestamp` after all listed entities are examined. Actually that makes me want to ask another related question. Do we risk making duplication by updating `currentKeys` in the middle of the loop? ## Simulation For example, ListS3 listed following entities at the 1st onTrigger. ### The 1st onTrigger simulation at t1 This should track `currentTimestamp` as `t1`, and `currentKeys` as `b1.txt`. |name|last modified|listed at| |-|-|| |a1.txt|t1 - x|t1| |b1.txt|t1|t1| ### The 2nd onTrigger simulation at t2 If S3 is being updated at the same time, there may be additional entities having the same t1 timestamp, but were not listed at the 1st onTrigger. In such case, those will be listed at the next onTrigger. Following table shows the expected effect of tracking `currentKeys`. `b1.txt` will not be listed at t2 because it's already listed at t1 and kept in `currentKeys`. |name|last modified|listed at| |-|-|| |a1.txt|t1 - x|t1| |b1.txt|t1|t1| |c1.txt|t1|t2| |d1.txt|t1|t2| ### The 2nd onTrigger simulation at t2 with newer timestamp entry preceding in lexicographical order Based on the above scenario, let's think about an edge case. New entries having later lastModified timestamp can be added at the same time at the time of the 2nd onTrigger ran. This might break the current implementation that updates `currentKeys` in the middle of the loop because entities are returned in lexicographical order. What if there was `a2.txt` having later timestamp than t1? |name|last modified|listed at| |-|-|| |a1.txt|t1 - x|t1| |a2.txt|t2|t2| |b1.txt|t1|t1 and *t2*| |c1.txt|t1|t2| |d1.txt|t1|t2| With current implementation, `b1.txt` would be listed again at t2. ## Suggestion - Like `maxTimestamp` (representing the latest timestamp at current onTrigger) and `currentTimestamp` (representing the latest timestamp at the last onTrigger), use separate variables to track the keys having the latest timestamp at the last run and current run. - Probably renaming variables would make code more readable. - Update only `maxTimestamp` and the keys with the latest timestamp of current iteration inside the loop, leave the variables which tracks the previous onTrigger state as it is. Then, after the loop, update the variables to track `previous` onTrigger state. Above approach would reflect background better, and also provide cleaner easily understandable code. I may be overly concerning details, but am feeling this can be better a bit more. Thanks for your patience! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } >
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307073#comment-16307073 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on the issue: https://github.com/apache/nifi/pull/2361 @ijokarumawak From the https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html#v2-RESTBucketGET-requests;>AWS S3 API documentation (see the `continuation-token` section): > Amazon S3 lists objects in UTF-8 character encoding in lexicographical order I really wish we could take the approach you suggested (would certainly make things easier), but since the entries are in lexicographical/alphabetical order, we must iterate over the entire listing before updating `currentTimestamp`. Otherwise we risk skipping keys newer than `currentTimestamp` but older than keys in the middle of the list. The lexicographical ordering also matches my experience when using the API. Unfortunately this does also mean that duplicates are possible when a listing fails, like the `IOException` scenario you mentioned. This is an existing limitation in ListS3. I appreciate your help getting this reviewed! :) > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16306676#comment-16306676 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159118001 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); +totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + +// Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; +persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); -if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} +if (totalListCount == 0) { --- End diff -- Good catch! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16306675#comment-16306675 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159117995 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); +totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + +// Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; +persistState(context); --- End diff -- These two lines of code can be embedded in `commit` method. ``` // Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; persistState(context); ``` > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16306613#comment-16306613 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159112711 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); +totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + +// Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; +persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); -if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} --- End diff -- Note that this `commit` isn't required, since the last part of the main do/while loop already does a `commit`. Further, it sets `listCount` to zero, so this branch would always be taken. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16306612#comment-16306612 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on the issue: https://github.com/apache/nifi/pull/2361 @ijokarumawak I did as you suggested and pulled `persistState` out in the case when no new keys have been listed, but this actually caused unit tests to fail. This is because `currentTimestamp` never changes during the main loop, so even though `commit` calls `persistState`, the value of `currentTimestamp` doesn't change until the main loop exits. Which is why `persistState` is required in both exit paths. Instead, I took a slightly different approach with the change just pushed. Since `currentTimestamp` is the current value persisted to the state manager, `maxTimestamp` is the highest timestamp seen in the main loop, and `currentKeys` is tied to `maxTimestamp` (not `currentTimestamp`), I removed the `persistState` call in `commit`, and did `persistState` at the end of `onTrigger` only. While this does continue to `persistState` on each exit, it reduces the number of `persistState` calls to once per `onTrigger` rather than once per 1000 keys iterated (which was done previously in `commit`). I did a bunch of manual testing with concurrent `PutS3Object` and `ListS3` and always got the correct number of listed keys, even when uploading 20k+ objects using 10 threads. I tried a few strategies to skip `persistState` if nothing had changed, but in manual testing it always produced the wrong number of keys, although sometimes only off by 1. The current code should be quite an improvement to the load on the state manager, even if it isn't ideal. I also introduced `totalListCount` which helps tighten up the log messages a bit. Previously it would "successfully list X objects" followed by "no new objects to list" in a single `onTrigger` (this was apparent in the unit test output). `totalListCount` also avoids an unnecessary `yield`. There's a lot going on in this one - let me know if you have any other questions! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16306602#comment-16306602 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159111452 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- @ijokarumawak I started writing an example, but then realized you are correct - there is no need to manually call `persistState` because any addition to `currentKeys` will also increment `listCount`, and the normal update mechanism will take over from there. We shouldn't need a `dirtyState` flag. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16304128#comment-16304128 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158753087 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- @adamlamar Well, if I'm not overlooking anything, `currentKeys` is only modified in `onTrigger` method when only new entry is found. Would you show me an example? It still confounds me. If we can set a flag like `dirtyState`, then the condition should be clarify though. Thanks! > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16304023#comment-16304023 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158740756 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- @ijokarumawak Its important to `persistState` when `currentKeys` has been modified, even if the `currentTimestamp` hasn't been modified, to avoid producing duplicates when multiple files are listed during the same millisecond. This is a related but distinct issue. Its a rare condition though. You're correct that this change will cause more load on the state manager. How about setting a flag like `dirtyState` that would avoid calling `setState` if it has not been modified? > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303562#comment-16303562 ] ASF GitHub Bot commented on NIFI-4715: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158674300 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- Do we still need this? Isn't updating state within commit() enough? We should minimize the number of status updates as some state storage is not designed for frequently updates, e.g. Zookeeper. I think if the processor didn't find any new file to list, then it does not have to update state, does it? I might be missing something as the original reason is not clear to me, to call persistState() when there was nothing to commit. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302653#comment-16302653 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158593061 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- Both exit paths already perform `persistState` - this just makes that more clear. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302652#comment-16302652 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158593053 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} --- End diff -- `maxTimestamp` should always be greater than `currentTimestamp`, but this adds a sanity check. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302651#comment-16302651 ] ASF GitHub Bot commented on NIFI-4715: -- Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158593055 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} --- End diff -- Since `currentTimestamp` is never overwritten by a `maxTimestamp` with a value of zero, this check shouldn't be necessary anymore. > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 produces duplicates in frequently updated buckets
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302649#comment-16302649 ] ASF GitHub Bot commented on NIFI-4715: -- GitHub user adamlamar opened a pull request: https://github.com/apache/nifi/pull/2361 NIFI-4715: ListS3 produces duplicates in frequently updated buckets Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [Y] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [Y] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [Y] Has your PR been rebased against the latest commit within the target branch (typically master)? - [Y] Is your initial contribution a single, squashed commit? ### For code changes: - [Y] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [N] Have you written or updated unit tests to verify your changes? - [NA] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [NA] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [NA] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [NA] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [NA] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/adamlamar/nifi NIFI-4715 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2361.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2361 commit bcaa84bba251c3a70c27a466185f6b20863eab93 Author: Adam LamarDate: 2017-12-24T03:29:02Z NIFI-4715: ListS3 produces duplicates in frequently updated buckets > ListS3 produces duplicates in frequently updated buckets > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)