[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-23 Thread Adam Lamar (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302646#comment-16302646
 ] 

Adam Lamar commented on NIFI-4715:
--

[~dmilan77] I was able to reproduce the issue you described. ListS3 
unnecessarily produces duplicates when two or more files are uploaded during 
the same millisecond. To reproduce, use a ListS3 processor listing regularly (1 
sec) and a PutS3Object processor with multiple threads (I used 10). After a few 
hundred PUTs, ListS3 will list more objects than PutS3Object has uploaded. I 
wasn't able to reproduce the same behavior when PutS3Object is configured with 
a single thread.

Initializing `maxTimestamp` to `currentTimestamp` fixes the issue because it 
allows the condition at line 260 to add the current key to `currentKeys`. Note 
that `currentKeys` is only used to determine which files have been listed 
during the current millisecond (not the whole bucket lifetime), so its normal 
for the set to be cleared when `lastModified` changes.

I plan to submit a PR that should fix the issue shortly. Thanks for reporting 
this issue!

> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
> Attachments: List-S3-dup-issue.xml, screenshot-1.png
>
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();
> *{color:red}+Update+{color}*:
> This is not a HashSet issue:
> Root cause is: 
> When the file gets uploaded to S3 simultaneously  when List S3 is in progress.
> onTrigger-->  maxTimestamp is initiated as 0L.
> This is clearing keys as per the code below
> When lastModifiedTime on S3 object is same as currentTimestamp for the listed 
> key it should be skipped. As the key is cleared, it is loading the same file 
> again. 
> I think fix should be to initiate the maxTimestamp with currentTimestamp not 
> 0L.
> {code}
>  long maxTimestamp = currentTimestamp;
> {code}
> Following block is clearing keys.
> {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid}
>  if (lastModified > maxTimestamp) {
> maxTimestamp = lastModified;
> currentKeys.clear();
> getLogger().debug("clearing keys");
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-20 Thread Milan Das (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299200#comment-16299200
 ] 

Milan Das commented on NIFI-4715:
-

Root cause is: 
When the file gets uploaded to S3 simultaneously  when List S3 is in progress.
onTrigger-->  maxTimestamp is initiated as 0L.
This is clearing keys as per the code below

When lastModifiedTime on S3 object is same as currentTimestamp for the listed 
key it should be skipped. As the key is cleared, it is loading the same file 
again. 
I think fix should be to initiate the maxTimestamp with currentTimestamp not 0L.
{code}
 long maxTimestamp = currentTimestamp;
{code}

Following block is clearing keys.
{code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid}
 if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
currentKeys.clear();
getLogger().debug("clearing keys");
}
{code}

> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
> Attachments: List-S3-dup-issue.xml, screenshot-1.png
>
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-20 Thread Milan Das (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299147#comment-16299147
 ] 

Milan Das commented on NIFI-4715:
-

yes. Agree multi thread doesn't exist. Root cause is different.

> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
> Attachments: List-S3-dup-issue.xml, screenshot-1.png
>
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-20 Thread Joseph Witt (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298534#comment-16298534
 ] 

Joseph Witt commented on NIFI-4715:
---

I'm not familiar enough with this specific processor but am definitely familiar 
with the get/fetch pattern/implementations.  There may well be a bug or 
opportunity to tighten the handling of edge conditions like time boundaries, 
files with the same name being placed twice, etc...  My primary note in 
replying was to help focus the effort away from it being a threading problem 
since the processor is designed to operate with a single thread by intent.

> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
> Attachments: List-S3-dup-issue.xml, screenshot-1.png
>
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-20 Thread Milan Das (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298529#comment-16298529
 ] 

Milan Das commented on NIFI-4715:
-

[~joewitt] Now I am able to fix the bug using DetectDuplicate. But seems like a 
bug in ListS3. 
I think Only reason the following code block is failing. 
{code:title=ListS3.java|borderStyle=solid}

if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && 
currentKeys.contains(versionSummary.getKey())) {
continue;
}




> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
> Attachments: List-S3-dup-issue.xml, screenshot-1.png
>
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-20 Thread Joseph Witt (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298517#comment-16298517
 ] 

Joseph Witt commented on NIFI-4715:
---

i'm not questioning whether duplicate listings are possible.  When listing from 
systems like this there are a range of interesting complex cases to handle.  
What i'm saying is that the suggested fix likely is unrelated to the stated 
problem.  Given that there is only a single thread we're  not having a 
threading issue.  It is not uncommon for people to use DetectDuplicate after a 
processor such as this so we can handle being given the same filename again.

> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-20 Thread Milan Das (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298478#comment-16298478
 ] 

Milan Das commented on NIFI-4715:
-

Additional logs after enabling log4j on ListS3:

AWS Last Modified Times
ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt – AWS 
Last modified is Dec 8, 2017 9:34:00 AM GMT-0500
· 
ls.s3.7236a098-30cf-40c9-8be7-f8df1d52b33e.2017-12-08T14.33.part104.txt – AWS 
Last modified is Dec 8, 2017 9:34:35 AM GMT-0500


  
 
 
First time 
ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt appears 
in the logs.
 
2017-12-08 14:34:00,762 DEBUG [Timer-Driven Process Thread-6] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Setting CLUSTER State to 
{key-0=AMBER/MARVIN/07708c5c-2939-4dba-99b1-de2727f1c080/ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt,
 currentTimestamp=1512743602000}
 
Events where S3List detects new files and pulls the old one down again.
 
2017-12-08 14:34:28,491 INFO [Provenance Repository Rollover Thread-1] 
o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance 
Event file containing 25 records. In the past 5 minutes, 118 events have been 
written to the Provenance Repository, totaling 94.96 KB
2017-12-08 14:34:29,486 INFO [Heartbeat Monitor Thread-1] 
o.a.n.c.c.h.AbstractHeartbeatMonitor Finished processing 1 heartbeats in 5162 
nanos
2017-12-08 14:34:29,806 INFO [Timer-Driven Process Thread-2] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Successfully listed S3 bucket 
amber-marvin in 1869 millis
2017-12-08 14:34:29,806 DEBUG [Timer-Driven Process Thread-2] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] No new objects in S3 bucket 
amber-marvin to list. Yielding.
2017-12-08 14:34:29,806 DEBUG [Timer-Driven Process Thread-2] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] has chosen to yield its 
resources; will not be scheduled to run again for 1000 milliseconds
2017-12-08 14:34:30,807 DEBUG [Timer-Driven Process Thread-10] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Returning CLUSTER State: 
StandardStateMap[version=2143, 
values={key-0=AMBER/MARVIN/07708c5c-2939-4dba-99b1-de2727f1c080/ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt,
 currentTimestamp=151274364}]
2017-12-08 14:34:31,929 INFO [Timer-Driven Process Thread-10] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Successfully listed S3 bucket 
amber-marvin in 1121 millis
2017-12-08 14:34:31,929 DEBUG [Timer-Driven Process Thread-10] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] No new objects in S3 bucket 
amber-marvin to list. Yielding.
2017-12-08 14:34:31,929 DEBUG [Timer-Driven Process Thread-10] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] has chosen to yield its 
resources; will not be scheduled to run again for 1000 milliseconds
2017-12-08 14:34:32,014 INFO [Process Cluster Protocol Request-6] 
o.a.n.c.p.impl.SocketProtocolListener Finished processing request 
2294c28c-b82a-4b90-9e24-bbb79d478ae4 (type=HEARTBEAT, length=2095 bytes) from 
iset-streaming-01.dev.secureauth.local:8085 in 3 millis
2017-12-08 14:34:32,015 INFO [Clustering Tasks Thread-1] 
o.a.n.c.c.ClusterProtocolHeartbeater Heartbeat created at 2017-12-08 
14:34:32,009 and sent to iset-streaming-01.dev.secureauth.local:8086 at 
2017-12-08 14:34:32,015; send took 5 millis
2017-12-08 14:34:32,930 DEBUG [Timer-Driven Process Thread-3] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Returning CLUSTER State: 
StandardStateMap[version=2143, 
values={key-0=AMBER/MARVIN/07708c5c-2939-4dba-99b1-de2727f1c080/ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part103.txt,
 currentTimestamp=151274364}]
2017-12-08 14:34:34,213 INFO [Timer-Driven Process Thread-3] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] Successfully listed S3 bucket 
amber-marvin in 1283 millis
2017-12-08 14:34:34,213 DEBUG [Timer-Driven Process Thread-3] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] No new objects in S3 bucket 
amber-marvin to list. Yielding.
2017-12-08 14:34:34,213 DEBUG [Timer-Driven Process Thread-3] 
org.apache.nifi.processors.aws.s3.ListS3 
ListS3[id=db049bf5-f3e4-3e2c-83b5-fd4388377832] has chosen to yield its 
resources; will not be scheduled to 

[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-20 Thread Milan Das (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298473#comment-16298473
 ] 

Milan Das commented on NIFI-4715:
-

[~joewitt]

Steps to reproduce (I can reproduce and  upload the fiow template if needed) . 
1. Keep the ListS3 flow running
2. Start loading files into S3 Bucket. My files have the naming conventions 
like: ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part165.txt
2. Followed by FetchS3 and PutFile.

Here is the error:
2017-12-19 13:21:10,491 WARN [Timer-Driven Process Thread-9] 
o.a.nifi.processors.standard.PutFile 
PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing 
StandardFlowFileRecord[uuid=3d7c66c4-3f46-43af-bf4f-0a4952cee85c,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, 
section=1], offset=10616, 
length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part165.txt,size=16]
 and routing to failure as configured because file with the same name already 
exists
2017-12-19 13:21:10,526 WARN [Timer-Driven Process Thread-2] 
o.a.nifi.processors.standard.PutFile 
PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing 
StandardFlowFileRecord[uuid=1f4121af-7eab-4793-8736-c4c79dbec609,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, 
section=1], offset=10632, 
length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part166.txt,size=16]
 and routing to failure as configured because file with the same name already 
exists
2017-12-19 13:21:10,610 WARN [Timer-Driven Process Thread-2] 
o.a.nifi.processors.standard.PutFile 
PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing 
StandardFlowFileRecord[uuid=b7087a30-1a9e-4f47-b431-18cd5a8f90da,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, 
section=1], offset=10648, 
length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part167.txt,size=16]
 and routing to failure as configured because file with the same name already 
exists
2017-12-19 13:21:10,643 WARN [Timer-Driven Process Thread-6] 
o.a.nifi.processors.standard.PutFile 
PutFile[id=cea1f31b-9865-32a2-e355-f7e07add4b17] Penalizing 
StandardFlowFileRecord[uuid=f0bb0d4a-5f83-4776-8ccd-e6c603590a2b,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1513688204862-1, container=default, 
section=1], offset=10664, 
length=16],offset=0,name=ls.s3.9447cb9e-0b42-4464-85d3-3117cf947f77.2017-12-08T14.33.part168.txt,size=16]
 and routing to failure as configured because file with the same name already 
exists
 
 


> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4715) ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-19 Thread Joseph Witt (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297848#comment-16297848
 ] 

Joseph Witt commented on NIFI-4715:
---

[~dmilan77]  ListS3 is annotated with TriggerSerially.  This means it can only 
ever have one thread.  It is designed to be run this way exclusively.  When you 
say run in multi-threaded mode are you saying you're able to have it run with 
more than one thread?  Can you share a screen shot.

It is designed to be single threaded for the listing then the listing results 
can be sent around the cluster via S2S protocol and Fetched in parallel.  This 
List/Fetch pattern is extremely common now for massive scale flows.

Please confirm whether there is a bug or a misunderstanding of how it works.

> ListS3 list  duplicate files when incoming file throughput to S3 is high
> 
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
>Reporter: Milan Das
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)