This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 4354072b4a NIFI-12158 MockProcessSession write methods preserves 
attributes (#7828)
4354072b4a is described below

commit 4354072b4a623f9e565887bef423f9cd13d62814
Author: Eric Secules <esecu...@gmail.com>
AuthorDate: Tue Oct 3 11:40:31 2023 -0700

    NIFI-12158 MockProcessSession write methods preserves attributes (#7828)
    
    Co-authored-by: Eric Secules <eric.secu...@macrohealth.com>
---
 .../java/org/apache/nifi/util/MockProcessSession.java     | 13 ++++++-------
 .../java/org/apache/nifi/util/TestMockProcessSession.java | 15 +++++++++++++++
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 2b8326ed50..ce0d93094e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -927,16 +927,15 @@ public class MockProcessSession implements ProcessSession 
{
         if (!(flowFile instanceof MockFlowFile)) {
             throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
         }
-
         final MockFlowFile mockFlowFile = validateState(flowFile);
-        writeRecursionSet.add(flowFile);
+        writeRecursionSet.add(mockFlowFile);
         final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
             @Override
             public void close() throws IOException {
                 super.close();
 
                 writeRecursionSet.remove(mockFlowFile);
-                final MockFlowFile newFlowFile = new 
MockFlowFile(mockFlowFile.getId(), flowFile);
+                final MockFlowFile newFlowFile = new 
MockFlowFile(mockFlowFile.getId(), mockFlowFile);
                 currentVersions.put(newFlowFile.getId(), newFlowFile);
 
                 newFlowFile.setData(toByteArray());
@@ -969,12 +968,12 @@ public class MockProcessSession implements ProcessSession 
{
     }
 
     @Override
-    public MockFlowFile write(final FlowFile flowFile, final StreamCallback 
callback) {
+    public MockFlowFile write(FlowFile flowFile, final StreamCallback 
callback) {
+        flowFile = validateState(flowFile);
         if (callback == null || flowFile == null) {
             throw new IllegalArgumentException("argument cannot be null");
         }
-        final MockFlowFile mock = validateState(flowFile);
-
+        final MockFlowFile mock = (MockFlowFile) flowFile;
         final ByteArrayInputStream in = new 
ByteArrayInputStream(mock.getData());
         final ByteArrayOutputStream out = new ByteArrayOutputStream();
 
@@ -987,7 +986,7 @@ public class MockProcessSession implements ProcessSession {
             writeRecursionSet.remove(flowFile);
         }
 
-        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        final MockFlowFile newFlowFile = new MockFlowFile(flowFile.getId(), 
flowFile);
         currentVersions.put(newFlowFile.getId(), newFlowFile);
         newFlowFile.setData(out.toByteArray());
 
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index eefd4a39dd..775bc2f5ed 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -133,6 +134,20 @@ public class TestMockProcessSession {
         assertFalse(ff1.isPenalized());
     }
 
+    @Test
+    public void testAttributePreservedAfterWrite() throws IOException {
+        final Processor processor = new PoorlyBehavedProcessor();
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong(0L)), processor, new 
MockStateManager(processor));
+        FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+        session.putAttribute(ff1, "key1", "val1");
+        session.write(ff1).close();
+        session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
+        session.commitAsync();
+        List<MockFlowFile> output = 
session.getFlowFilesForRelationship(PoorlyBehavedProcessor.REL_FAILURE);
+        assertEquals(1, output.size());
+        output.get(0).assertAttributeEquals("key1", "val1");
+    }
+
     protected static class PoorlyBehavedProcessor extends AbstractProcessor {
 
         private static final Relationship REL_FAILURE = new 
Relationship.Builder()

Reply via email to