[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302646#comment-16302646 ] Adam Lamar commented on NIFI-4715: -- [~dmilan77] I was able to reproduce the issue you described. ListS3 unnecessarily produces duplicates when two or more files are uploaded during the same millisecond. To reproduce, use a ListS3 processor listing regularly (1 sec) and a PutS3Object processor with multiple threads (I used 10). After a few hundred PUTs, ListS3 will list more objects than PutS3Object has uploaded. I wasn't able to reproduce the same behavior when PutS3Object is configured with a single thread. Initializing `maxTimestamp` to `currentTimestamp` fixes the issue because it allows the condition at line 260 to add the current key to `currentKeys`. Note that `currentKeys` is only used to determine which files have been listed during the current millisecond (not the whole bucket lifetime), so its normal for the set to be cleared when `lastModified` changes. I plan to submit a PR that should fix the issue shortly. Thanks for reporting this issue! > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299200#comment-16299200 ] Milan Das commented on NIFI-4715: - Root cause is: When the file gets uploaded to S3 simultaneously when List S3 is in progress. onTrigger--> maxTimestamp is initiated as 0L. This is clearing keys as per the code below When lastModifiedTime on S3 object is same as currentTimestamp for the listed key it should be skipped. As the key is cleared, it is loading the same file again. I think fix should be to initiate the maxTimestamp with currentTimestamp not 0L. {code} long maxTimestamp = currentTimestamp; {code} Following block is clearing keys. {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} if (lastModified > maxTimestamp) { maxTimestamp = lastModified; currentKeys.clear(); getLogger().debug("clearing keys"); } {code} > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299147#comment-16299147 ] Milan Das commented on NIFI-4715: - yes. Agree multi thread doesn't exist. Root cause is different. > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298534#comment-16298534 ] Joseph Witt commented on NIFI-4715: --- I'm not familiar enough with this specific processor but am definitely familiar with the get/fetch pattern/implementations. There may well be a bug or opportunity to tighten the handling of edge conditions like time boundaries, files with the same name being placed twice, etc... My primary note in replying was to help focus the effort away from it being a threading problem since the processor is designed to operate with a single thread by intent. > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298529#comment-16298529 ] Milan Das commented on NIFI-4715: - [~joewitt] Now I am able to fix the bug using DetectDuplicate. But seems like a bug in ListS3. I think Only reason the following code block is failing. {code:title=ListS3.java|borderStyle=solid} if (lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) { continue; } > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298517#comment-16298517 ] Joseph Witt commented on NIFI-4715: --- i'm not questioning whether duplicate listings are possible. When listing from systems like this there are a range of interesting complex cases to handle. What i'm saying is that the suggested fix likely is unrelated to the stated problem. Given that there is only a single thread we're not having a threading issue. It is not uncommon for people to use DetectDuplicate after a processor such as this so we can handle being given the same filename again. > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298478#comment-16298478 ] Milan Das commented on NIFI-4715: - Additional logs after enabling log4j on ListS3: AWS Last Modified Times ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt – AWS Last modified is Dec 8, 2017 9:34:00 AM GMT-0500 · ls.s3.7236a098-30cf-40c9-8be7-f8df1d52b33e.2017-12-08T14.33.part104.txt – AWS Last modified is Dec 8, 2017 9:34:35 AM GMT-0500 First time ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt appears in the logs. 2017-12-08 14:34:00,762 DEBUG [Timer-Driven Process Thread-6] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Setting CLUSTER State to {key-0=AMBER/MARVIN/07708c5c-2939-4dba-99b1-de2727f1c080/ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt, currentTimestamp=1512743602000} Events where S3List detects new files and pulls the old one down again. 2017-12-08 14:34:28,491 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 25 records. In the past 5 minutes, 118 events have been written to the Provenance Repository, totaling 94.96 KB 2017-12-08 14:34:29,486 INFO [Heartbeat Monitor Thread-1] o.a.n.c.c.h.AbstractHeartbeatMonitor Finished processing 1 heartbeats in 5162 nanos 2017-12-08 14:34:29,806 INFO [Timer-Driven Process Thread-2] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Successfully listed S3 bucket amber-marvin in 1869 millis 2017-12-08 14:34:29,806 DEBUG [Timer-Driven Process Thread-2] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] No new objects in S3 bucket amber-marvin to list. Yielding. 2017-12-08 14:34:29,806 DEBUG [Timer-Driven Process Thread-2] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] has chosen to yield its resources; will not be scheduled to run again for 1000 milliseconds 2017-12-08 14:34:30,807 DEBUG [Timer-Driven Process Thread-10] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Returning CLUSTER State: StandardStateMap[version=2143, values={key-0=AMBER/MARVIN/07708c5c-2939-4dba-99b1-de2727f1c080/ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt, currentTimestamp=151274364}] 2017-12-08 14:34:31,929 INFO [Timer-Driven Process Thread-10] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Successfully listed S3 bucket amber-marvin in 1121 millis 2017-12-08 14:34:31,929 DEBUG [Timer-Driven Process Thread-10] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] No new objects in S3 bucket amber-marvin to list. Yielding. 2017-12-08 14:34:31,929 DEBUG [Timer-Driven Process Thread-10] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] has chosen to yield its resources; will not be scheduled to run again for 1000 milliseconds 2017-12-08 14:34:32,014 INFO [Process Cluster Protocol Request-6] o.a.n.c.p.impl.SocketProtocolListener Finished processing request 2294c28c-b82a-4b90-9e24-bbb79d478ae4 (type=HEARTBEAT, length=2095 bytes) from iset-streaming-01.dev.secureauth.local:8085 in 3 millis 2017-12-08 14:34:32,015 INFO [Clustering Tasks Thread-1] o.a.n.c.c.ClusterProtocolHeartbeater Heartbeat created at 2017-12-08 14:34:32,009 and sent to iset-streaming-01.dev.secureauth.local:8086 at 2017-12-08 14:34:32,015; send took 5 millis 2017-12-08 14:34:32,930 DEBUG [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Returning CLUSTER State: StandardStateMap[version=2143, values={key-0=AMBER/MARVIN/07708c5c-2939-4dba-99b1-de2727f1c080/ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt, currentTimestamp=151274364}] 2017-12-08 14:34:34,213 INFO [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Successfully listed S3 bucket amber-marvin in 1283 millis 2017-12-08 14:34:34,213 DEBUG [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] No new objects in S3 bucket amber-marvin to list. Yielding. 2017-12-08 14:34:34,213 DEBUG [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] has chosen to yield its resources; will not be scheduled to
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298473#comment-16298473 ] Milan Das commented on NIFI-4715: - [~joewitt] Steps to reproduce (I can reproduce and upload the fiow template if needed) . 1. Keep the ListS3 flow running 2. Start loading files into S3 Bucket. My files have the naming conventions like: ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part165.txt 2. Followed by FetchS3 and PutFile. Here is the error: 2017-12-19 13:21:10,491 WARN [Timer-Driven Process Thread-9] o.a.nifi.processors.standard.PutFile PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing StandardFlowFileRecord[uuid=3d7c66c4-3f46-43af-bf4f-0a4952cee85c,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, section=1], offset=10616, length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part165.txt,size=16] and routing to failure as configured because file with the same name already exists 2017-12-19 13:21:10,526 WARN [Timer-Driven Process Thread-2] o.a.nifi.processors.standard.PutFile PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing StandardFlowFileRecord[uuid=1f4121af-7eab-4793-8736-c4c79dbec609,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, section=1], offset=10632, length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part166.txt,size=16] and routing to failure as configured because file with the same name already exists 2017-12-19 13:21:10,610 WARN [Timer-Driven Process Thread-2] o.a.nifi.processors.standard.PutFile PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing StandardFlowFileRecord[uuid=b7087a30-1a9e-4f47-b431-18cd5a8f90da,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, section=1], offset=10648, length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part167.txt,size=16] and routing to failure as configured because file with the same name already exists 2017-12-19 13:21:10,643 WARN [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.PutFile PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing StandardFlowFileRecord[uuid=f0bb0d4a-5f83-4776-8ccd-e6c603590a2b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, section=1], offset=10664, length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part168.txt,size=16] and routing to failure as configured because file with the same name already exists > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high
[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297848#comment-16297848 ] Joseph Witt commented on NIFI-4715: --- [~dmilan77] ListS3 is annotated with TriggerSerially. This means it can only ever have one thread. It is designed to be run this way exclusively. When you say run in multi-threaded mode are you saying you're able to have it run with more than one thread? Can you share a screen shot. It is designed to be single threaded for the listing then the listing results can be sent around the cluster via S2S protocol and Fetched in parallel. This List/Fetch pattern is extremely common now for massive scale flows. Please confirm whether there is a bug or a misunderstanding of how it works. > ListS3 list duplicate files when incoming file throughput to S3 is high > > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All >Reporter: Milan Das > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)