asfgit closed pull request #3233: NIFI-5892 Wait timestamp lingers, potentially 
messing up downstream w…
URL: https://github.com/apache/nifi/pull/3233
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index cfec15e263..e2975560ac 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -84,7 +84,8 @@
 )
 @WritesAttributes({
         @WritesAttribute(attribute = "wait.start.timestamp", description = 
"All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
-        + "initial epoch timestamp when the file first entered this processor. 
 This is used to determine the expiration time of the FlowFile."),
+        + "initial epoch timestamp when the file first entered this processor. 
 This is used to determine the expiration time of the FlowFile.  "
+        + "This attribute is not written when the FlowFile is transferred to 
failure or success"),
         @WritesAttribute(attribute = "wait.counter.<counterName>", description 
= "If a signal exists when the processor runs, "
         + "each count value in the signal is copied.")
 })
@@ -314,6 +315,8 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 
         final Consumer<FlowFile> transferToFailure = flowFile -> {
             flowFile = session.penalize(flowFile);
+            // This flowFile is now failed, our tracking is done, clear the 
timer
+            flowFile = clearWaitState(session, flowFile);
             getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
         };
 
@@ -328,9 +331,19 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                     relationship = Relationship.SELF;
                 }
             }
-
+            final Relationship finalRelationship = relationship;
             final List<FlowFile> flowFilesWithSignalAttributes = 
routedFlowFiles.getValue().stream()
-                    .map(f -> copySignalAttributes(session, f, 
signalRef.get(), originalSignalCounts, 
replaceOriginalAttributes)).collect(Collectors.toList());
+                    .map(f -> {
+                        if (REL_SUCCESS.equals(finalRelationship)) {
+                            // These flowFiles will be exiting the wait, clear 
the timer
+                            f = clearWaitState(session, f);
+                        }
+                        return copySignalAttributes(session, f, 
signalRef.get(),
+                            originalSignalCounts,
+                            replaceOriginalAttributes);
+                    })
+                    .collect(Collectors.toList());
+
             session.transfer(flowFilesWithSignalAttributes, relationship);
         };
 
@@ -470,6 +483,10 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 
     }
 
+    private FlowFile clearWaitState(final ProcessSession session, final 
FlowFile flowFile) {
+        return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
+    }
+
     private FlowFile copySignalAttributes(final ProcessSession session, final 
FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, 
final boolean replaceOriginal) {
         if (signal == null) {
             return flowFile;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index a4df2f37e9..360567b534 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -69,6 +69,8 @@ public void testWait() throws InitializationException {
 
         // no cache key attribute
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+        // timestamp must be present
+        
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -101,6 +103,7 @@ public void testExpired() throws InitializationException, 
InterruptedException {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         runner.clearTransferState();
         runner.enqueue(ff);
@@ -126,7 +129,7 @@ public void testCounterExpired() throws 
InitializationException, InterruptedExce
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
         runner.enqueue(ff);
 
@@ -164,6 +167,7 @@ public void testBadWaitStartTimestamp() throws 
InitializationException, Interrup
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
+        
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -178,6 +182,7 @@ public void testEmptyReleaseSignal() throws 
InitializationException, Interrupted
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
         
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
+        
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -232,6 +237,8 @@ public void testReplaceAttributes() throws 
InitializationException, IOException
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that uuid was not overwritten
@@ -276,6 +283,8 @@ public void testKeepOriginalAttributes() throws 
InitializationException, IOExcep
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that uuid was not overwritten
@@ -317,7 +326,7 @@ public void testWaitForTotalCount() throws 
InitializationException, IOException
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         /*
          * 2nd iteration.
          */
@@ -331,6 +340,7 @@ public void testWaitForTotalCount() throws 
InitializationException, IOException
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 3rd iteration.
@@ -342,6 +352,7 @@ public void testWaitForTotalCount() throws 
InitializationException, IOException
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 4th iteration.
@@ -357,6 +368,9 @@ public void testWaitForTotalCount() throws 
InitializationException, IOException
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // wait timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that uuid was not overwritten
@@ -401,6 +415,7 @@ public void testWaitForSpecificCount() throws 
InitializationException, IOExcepti
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 2nd iteration.
@@ -415,6 +430,7 @@ public void testWaitForSpecificCount() throws 
InitializationException, IOExcepti
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since counter-B doesn't reach to 2.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 3rd iteration.
@@ -429,6 +445,7 @@ public void testWaitForSpecificCount() throws 
InitializationException, IOExcepti
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 4th iteration.
@@ -444,6 +461,8 @@ public void testWaitForSpecificCount() throws 
InitializationException, IOExcepti
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // wait timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that uuid was not overwritten
@@ -498,6 +517,8 @@ public void testDecrementCache() throws 
ConcurrentModificationException, IOExcep
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
 
         // expect counter to be decremented to 0 and releasable count remains 
1.
@@ -514,6 +535,8 @@ public void testDecrementCache() throws 
ConcurrentModificationException, IOExcep
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // All counters are consumed.
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to