This is an automated email from the ASF dual-hosted git repository. mosermw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 63fa036818 NIFI-12853 Refactor FlowFilePrioritizer using updated Java APIs 63fa036818 is described below commit 63fa036818543deb2bec9d2f9ecbd3ee20bff69f Author: EndzeitBegins <16666115+endzeitbeg...@users.noreply.github.com> AuthorDate: Thu Feb 29 23:05:46 2024 +0100 NIFI-12853 Refactor FlowFilePrioritizer using updated Java APIs Signed-off-by: Mike Moser <mose...@apache.org> This closes #8461 --- .../apache/nifi/flowfile/FlowFilePrioritizer.java | 9 +- .../prioritizer/FirstInFirstOutPrioritizer.java | 24 ++-- .../NewestFlowFileFirstPrioritizer.java | 25 ++--- .../OldestFlowFileFirstPrioritizer.java | 24 ++-- .../prioritizer/PriorityAttributePrioritizer.java | 88 ++++++--------- ...st.java => FirstInFirstOutPrioritizerTest.java} | 32 +++--- .../prioritizer/NewestFirstPrioritizerTest.java | 33 +++--- .../prioritizer/OldestFirstPrioritizerTest.java | 45 +++----- .../PriorityAttributePrioritizerTest.java | 122 +++++++-------------- 9 files changed, 149 insertions(+), 253 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java b/nifi-framework-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java index 684f454f57..cb60ebed17 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java @@ -19,11 +19,10 @@ package org.apache.nifi.flowfile; import java.util.Comparator; /** - * Provides a mechanism to prioritize flow file objects based on their - * attributes. The actual flow file content will not be available for comparison - * so if features of that content are necessary for prioritization it should be - * extracted to be used as an attribute of the flow file. - * + * Provides a mechanism to prioritize flow file objects based on their attributes. + * The actual flow file content will not be available for comparison. + * If features of that content are necessary for prioritization, + * it should be extracted to be used as an attribute of the flow file. */ public interface FlowFilePrioritizer extends Comparator<FlowFile> { } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java index 08437c76ba..42cd4c81ed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java @@ -19,24 +19,18 @@ package org.apache.nifi.prioritizer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import java.util.Comparator; + public class FirstInFirstOutPrioritizer implements FlowFilePrioritizer { + private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast( + Comparator + .comparingLong(FlowFile::getLastQueueDate) + .thenComparingLong(FlowFile::getQueueDateIndex) + ); + @Override public int compare(final FlowFile o1, final FlowFile o2) { - if (o1 == null && o2 == null) { - return 0; - } else if (o2 == null) { - return -1; - } else if (o1 == null) { - return 1; - } - - final int dateComparison = o1.getLastQueueDate().compareTo(o2.getLastQueueDate()); - if (dateComparison != 0) { - return dateComparison; - } - - return Long.compare(o1.getQueueDateIndex(), o2.getQueueDateIndex()); + return composedComparator.compare(o1, o2); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java index 4893cf0589..6e31a300c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java @@ -19,24 +19,19 @@ package org.apache.nifi.prioritizer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import java.util.Comparator; + public class NewestFlowFileFirstPrioritizer implements FlowFilePrioritizer { + private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast( + Comparator + .comparingLong(FlowFile::getLineageStartDate) + .thenComparingLong(FlowFile::getLineageStartIndex) + .reversed() + ); + @Override public int compare(final FlowFile o1, final FlowFile o2) { - if (o1 == null && o2 == null) { - return 0; - } else if (o2 == null) { - return -1; - } else if (o1 == null) { - return 1; - } - - final int lineageDateCompare = Long.compare(o2.getLineageStartDate(), o1.getLineageStartDate()); - if (lineageDateCompare != 0) { - return lineageDateCompare; - } - - return Long.compare(o2.getLineageStartIndex(), o1.getLineageStartIndex()); + return composedComparator.compare(o1, o2); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java index 386d91238b..7f0a21c442 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java @@ -19,24 +19,18 @@ package org.apache.nifi.prioritizer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import java.util.Comparator; + public class OldestFlowFileFirstPrioritizer implements FlowFilePrioritizer { + private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast( + Comparator + .comparingLong(FlowFile::getLineageStartDate) + .thenComparingLong(FlowFile::getLineageStartIndex) + ); + @Override public int compare(final FlowFile o1, final FlowFile o2) { - if (o1 == null && o2 == null) { - return 0; - } else if (o2 == null) { - return -1; - } else if (o1 == null) { - return 1; - } - - final int lineageDateCompare = Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate()); - if (lineageDateCompare != 0) { - return lineageDateCompare; - } - - return Long.compare(o1.getLineageStartIndex(), o2.getLineageStartIndex()); + return composedComparator.compare(o1, o2); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java index fdd963e65b..32d6cc6258 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java @@ -16,71 +16,55 @@ */ package org.apache.nifi.prioritizer; -import java.util.regex.Pattern; - import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import java.util.Comparator; +import java.util.function.Predicate; +import java.util.regex.Pattern; + /** - * This prioritizer checks each FlowFile for a "priority" attribute and lets - * that attribute determine the priority. - * - * 1. if neither FlowFile has a "priority" attribute then order will be - * FirstInFirstOut 2. if one FlowFile has a "priority" attribute and the other - * does not, then the one with the attribute wins 3. if one or both "priority" - * attributes is an integer, then the lowest number wins 4. the "priority" - * attributes are compared lexicographically and the lowest wins + * This prioritizer checks each FlowFile for a "priority" attribute and lets that attribute determine the priority. + * <p> + * 1. If one FlowFile has a "priority" attribute and the other does not, then the one with the attribute wins. + * 2. If only one's FlowFile "priority" attribute is an integer, then that FlowFile wins. + * 3. If the "priority" attributes of both are an integer, then the FlowFile with the lowest number wins. + * 4. If the "priority" attributes of both are not an integer, they're compared lexicographically and the lowest wins. */ public class PriorityAttributePrioritizer implements FlowFilePrioritizer { - private static final Pattern intPattern = Pattern.compile("-?\\d+"); + private static final Predicate<String> isInteger = Pattern.compile("-?\\d+").asMatchPredicate(); - @Override - public int compare(FlowFile o1, FlowFile o2) { - if (o1 == null && o2 == null) { - return 0; - } else if (o2 == null) { - return -1; - } else if (o1 == null) { - return 1; - } + private static final Comparator<String> priorityAttributeComparator = Comparator.nullsLast( + Comparator.comparing( + PriorityAttributePrioritizer::parseLongOrNull, + Comparator.nullsLast(Long::compare) + ).thenComparing(Comparator.naturalOrder()) + ); - String o1Priority = o1.getAttribute(CoreAttributes.PRIORITY.key()); - String o2Priority = o2.getAttribute(CoreAttributes.PRIORITY.key()); - if (o1Priority == null && o2Priority == null) { - return 0; - } else if (o2Priority == null) { - return -1; - } else if (o1Priority == null) { - return 1; - } + private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast( + Comparator.comparing( + flowFile -> flowFile.getAttribute(CoreAttributes.PRIORITY.key()), + priorityAttributeComparator + ) + ); - // priority exists on both FlowFiles - if (intPattern.matcher(o1Priority.trim()).matches()) { - if (intPattern.matcher(o2Priority.trim()).matches()) { - try { - // both o1Priority and o2Priority are numbers - long o1num = Long.parseLong(o1Priority.trim()); - long o2num = Long.parseLong(o2Priority.trim()); - return o1num < o2num ? -1 : (o1num > o2num ? 1 : 0); - } catch (NumberFormatException e) { - // not a long after regex matched - return 0; - } - } else { - // o1Priority is a number, o2Priority is not, o1 wins - return -1; - } - } else { - if (intPattern.matcher(o2Priority.trim()).matches()) { - // o2Priority is a number, o1Priority is not, o2 wins - return 1; - } else { - // neither o1Priority nor o2Priority are numbers - return o1Priority.compareTo(o2Priority); + private static Long parseLongOrNull(String attribute) { + final String trimmedAttribute = attribute.trim(); + + if (isInteger.test(trimmedAttribute)) { + try { + return Long.parseLong(trimmedAttribute); + } catch (NumberFormatException ignored) { + return null; } } + return null; } + @Override + public int compare(FlowFile o1, FlowFile o2) { + return composedComparator.compare(o1, o2); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizerTest.java similarity index 62% copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java copy to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizerTest.java index 13915e34da..ca1002d12c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizerTest.java @@ -16,33 +16,28 @@ */ package org.apache.nifi.prioritizer; -import org.apache.nifi.processor.Processor; +import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessSession; import org.apache.nifi.util.NoOpProcessor; -import org.apache.nifi.util.SharedSessionState; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; -public class NewestFirstPrioritizerTest { + +@SuppressWarnings("EqualsWithItself") +public class FirstInFirstOutPrioritizerTest { + + private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class); + private final FlowFilePrioritizer prioritizer = new FirstInFirstOutPrioritizer(); @Test public void testPrioritizer() { - final Processor processor = new NoOpProcessor(); - final AtomicLong idGenerator = new AtomicLong(0L); - final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); + MockFlowFile flowFile1 = testRunner.enqueue("created first but 'enqueued' later"); + flowFile1.setLastEnqueuedDate(830822400000L); + MockFlowFile flowFile2 = testRunner.enqueue("created second but 'enqueued' earlier"); + flowFile2.setLastEnqueuedDate(795916800000L); - final MockFlowFile flowFile1 = session.create(); - try { - Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1 - } catch (final InterruptedException e) { - } - final MockFlowFile flowFile2 = session.create(); - - final NewestFlowFileFirstPrioritizer prioritizer = new NewestFlowFileFirstPrioritizer(); assertEquals(0, prioritizer.compare(null, null)); assertEquals(-1, prioritizer.compare(flowFile1, null)); assertEquals(1, prioritizer.compare(null, flowFile1)); @@ -51,5 +46,4 @@ public class NewestFirstPrioritizerTest { assertEquals(1, prioritizer.compare(flowFile1, flowFile2)); assertEquals(-1, prioritizer.compare(flowFile2, flowFile1)); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java index 13915e34da..494769ab8d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java @@ -16,33 +16,27 @@ */ package org.apache.nifi.prioritizer; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.util.NoOpProcessor; -import org.apache.nifi.util.SharedSessionState; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; + +@SuppressWarnings("EqualsWithItself") public class NewestFirstPrioritizerTest { - @Test - public void testPrioritizer() { - final Processor processor = new NoOpProcessor(); - final AtomicLong idGenerator = new AtomicLong(0L); - final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); + private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class); + private final FlowFilePrioritizer prioritizer = new NewestFlowFileFirstPrioritizer(); - final MockFlowFile flowFile1 = session.create(); - try { - Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1 - } catch (final InterruptedException e) { - } - final MockFlowFile flowFile2 = session.create(); + @Test + public void testPrioritizer() throws InterruptedException { + final FlowFile flowFile1 = testRunner.enqueue("flowFile1"); + Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1 + final FlowFile flowFile2 = testRunner.enqueue("flowFile1"); - final NewestFlowFileFirstPrioritizer prioritizer = new NewestFlowFileFirstPrioritizer(); assertEquals(0, prioritizer.compare(null, null)); assertEquals(-1, prioritizer.compare(flowFile1, null)); assertEquals(1, prioritizer.compare(null, flowFile1)); @@ -51,5 +45,4 @@ public class NewestFirstPrioritizerTest { assertEquals(1, prioritizer.compare(flowFile1, flowFile2)); assertEquals(-1, prioritizer.compare(flowFile2, flowFile1)); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java index 32a9c7eb43..cfdc5c53f9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java @@ -16,37 +16,27 @@ */ package org.apache.nifi.prioritizer; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessSession; -import org.apache.nifi.util.SharedSessionState; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; +@SuppressWarnings("EqualsWithItself") public class OldestFirstPrioritizerTest { - @Test - public void testPrioritizer() { - final Processor processor = new SimpleProcessor(); - final AtomicLong idGenerator = new AtomicLong(0L); - final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); + private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class); + private final FlowFilePrioritizer prioritizer = new OldestFlowFileFirstPrioritizer(); - final MockFlowFile flowFile1 = session.create(); - try { - Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1 - } catch (final InterruptedException e) { - } - final MockFlowFile flowFile2 = session.create(); + @Test + public void testPrioritizer() throws InterruptedException { + final FlowFile flowFile1 = testRunner.enqueue("flowFile1"); + Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1 + final FlowFile flowFile2 = testRunner.enqueue("flowFile2"); - final OldestFlowFileFirstPrioritizer prioritizer = new OldestFlowFileFirstPrioritizer(); assertEquals(0, prioritizer.compare(null, null)); assertEquals(-1, prioritizer.compare(flowFile1, null)); assertEquals(1, prioritizer.compare(null, flowFile1)); @@ -55,13 +45,4 @@ public class OldestFirstPrioritizerTest { assertEquals(-1, prioritizer.compare(flowFile1, flowFile2)); assertEquals(1, prioritizer.compare(flowFile2, flowFile1)); } - - public class SimpleProcessor extends AbstractProcessor { - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - } - - } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java index b88d3ae1fa..0788b089cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java @@ -16,102 +16,64 @@ */ package org.apache.nifi.prioritizer; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessSession; -import org.apache.nifi.util.SharedSessionState; -import org.junit.jupiter.api.BeforeAll; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; +@SuppressWarnings("EqualsWithItself") public class PriorityAttributePrioritizerTest { - static Map<String, String> attrsPri1 = new HashMap<>(); - static Map<String, String> attrsPri2 = new HashMap<>(); - static Map<String, String> attrsPrin1 = new HashMap<>(); - static Map<String, String> attrsPriA = new HashMap<>(); - static Map<String, String> attrsPriB = new HashMap<>(); - static Map<String, String> attrsPriLP = new HashMap<>(); - static Map<String, String> attrsPriLN = new HashMap<>(); - - @BeforeAll - public static void init() { - attrsPri1.put(CoreAttributes.PRIORITY.key(), "1"); - attrsPri2.put(CoreAttributes.PRIORITY.key(), "2"); - attrsPrin1.put(CoreAttributes.PRIORITY.key(), "-1"); - attrsPriA.put(CoreAttributes.PRIORITY.key(), "A"); - attrsPriB.put(CoreAttributes.PRIORITY.key(), "B"); - attrsPriLP.put(CoreAttributes.PRIORITY.key(), "5432123456789"); - attrsPriLN.put(CoreAttributes.PRIORITY.key(), "-5432123456789"); - } + private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class); + private final FlowFilePrioritizer prioritizer = new PriorityAttributePrioritizer(); @Test public void testPrioritizer() { - final Processor processor = new SimpleProcessor(); - final AtomicLong idGenerator = new AtomicLong(0L); - final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); - - final MockFlowFile ffNoPriority = session.create(); - final MockFlowFile ffPri1 = session.create(); - ffPri1.putAttributes(attrsPri1); - final MockFlowFile ffPri2 = session.create(); - ffPri2.putAttributes(attrsPri2); - final MockFlowFile ffPrin1 = session.create(); - ffPrin1.putAttributes(attrsPrin1); - final MockFlowFile ffPriA = session.create(); - ffPriA.putAttributes(attrsPriA); - final MockFlowFile ffPriB = session.create(); - ffPriB.putAttributes(attrsPriB); - final MockFlowFile ffPriLP = session.create(); - ffPriLP.putAttributes(attrsPriLP); - final MockFlowFile ffPriLN = session.create(); - ffPriLN.putAttributes(attrsPriLN); + final FlowFile ffWithoutPriority = testRunner.enqueue("data"); + final FlowFile ffWithPriority1 = enqueueWithPriority("1"); + final FlowFile ffWithPriority2 = enqueueWithPriority("2"); + final FlowFile ffWithPriorityNegative1 = enqueueWithPriority("-1"); + final FlowFile ffWithPriorityA = enqueueWithPriority("A"); + final FlowFile ffWithPriorityB = enqueueWithPriority("B"); + final FlowFile ffWithLongPriority = enqueueWithPriority("5432123456789"); + final FlowFile ffWithNegativeLongPriority = enqueueWithPriority("-5432123456789"); - final PriorityAttributePrioritizer prioritizer = new PriorityAttributePrioritizer(); assertEquals(0, prioritizer.compare(null, null)); - assertEquals(-1, prioritizer.compare(ffNoPriority, null)); - assertEquals(1, prioritizer.compare(null, ffNoPriority)); - - assertEquals(0, prioritizer.compare(ffNoPriority, ffNoPriority)); - assertEquals(-1, prioritizer.compare(ffPri1, ffNoPriority)); - assertEquals(1, prioritizer.compare(ffNoPriority, ffPri1)); - - assertEquals(0, prioritizer.compare(ffPri1, ffPri1)); - assertEquals(-1, prioritizer.compare(ffPri1, ffPri2)); - assertEquals(1, prioritizer.compare(ffPri2, ffPri1)); - assertEquals(-1, prioritizer.compare(ffPrin1, ffPri1)); - assertEquals(1, prioritizer.compare(ffPri1, ffPrin1)); - - assertEquals(-1, prioritizer.compare(ffPri1, ffPriA)); - assertEquals(1, prioritizer.compare(ffPriA, ffPri1)); - - assertEquals(0, prioritizer.compare(ffPriA, ffPriA)); - assertEquals(-1, prioritizer.compare(ffPriA, ffPriB)); - assertEquals(1, prioritizer.compare(ffPriB, ffPriA)); - - assertEquals(1, prioritizer.compare(ffPriLP, ffPri1)); - assertEquals(-1, prioritizer.compare(ffPri1, ffPriLP)); - assertEquals(-1, prioritizer.compare(ffPriLN, ffPri1)); - assertEquals(1, prioritizer.compare(ffPri1, ffPriLN)); + assertEquals(-1, prioritizer.compare(ffWithoutPriority, null)); + assertEquals(1, prioritizer.compare(null, ffWithoutPriority)); + + assertEquals(0, prioritizer.compare(ffWithoutPriority, ffWithoutPriority)); + assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithoutPriority)); + assertEquals(1, prioritizer.compare(ffWithoutPriority, ffWithPriority1)); + + assertEquals(0, prioritizer.compare(ffWithPriority1, ffWithPriority1)); + assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithPriority2)); + assertEquals(1, prioritizer.compare(ffWithPriority2, ffWithPriority1)); + assertEquals(-1, prioritizer.compare(ffWithPriorityNegative1, ffWithPriority1)); + assertEquals(1, prioritizer.compare(ffWithPriority1, ffWithPriorityNegative1)); + + assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithPriorityA)); + assertEquals(1, prioritizer.compare(ffWithPriorityA, ffWithPriority1)); + + assertEquals(0, prioritizer.compare(ffWithPriorityA, ffWithPriorityA)); + assertEquals(-1, prioritizer.compare(ffWithPriorityA, ffWithPriorityB)); + assertEquals(1, prioritizer.compare(ffWithPriorityB, ffWithPriorityA)); + + assertEquals(1, prioritizer.compare(ffWithLongPriority, ffWithPriority1)); + assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithLongPriority)); + assertEquals(-1, prioritizer.compare(ffWithNegativeLongPriority, ffWithPriority1)); + assertEquals(1, prioritizer.compare(ffWithPriority1, ffWithNegativeLongPriority)); } - public class SimpleProcessor extends AbstractProcessor { - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - } - + private MockFlowFile enqueueWithPriority(String priority) { + return testRunner.enqueue("data", Map.of(CoreAttributes.PRIORITY.key(), priority)); } - }