Repository: nifi
Updated Branches:
  refs/heads/master ba2719836 -> fb65cf123


NIFI-1271: Yield funnels and ports for nifi.bored.yield.duration amount of time 
if backpressure is applied, as we do when there are no input FlowFiles. 
Adjusting logic for ContinuallyRunProcessorTask#call in determining if there is 
appropriate availability for processor relationships.

Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fb65cf12
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fb65cf12
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fb65cf12

Branch: refs/heads/master
Commit: fb65cf1235d7a1176c58244bc4307f72de0e2a5d
Parents: ba27198
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Dec 8 07:53:36 2015 -0500
Committer: Aldrin Piri <ald...@apache.org>
Committed: Tue Dec 8 10:54:29 2015 -0500

----------------------------------------------------------------------
 .../controller/tasks/ContinuallyRunConnectableTask.java     | 9 +++++----
 .../nifi/controller/tasks/ContinuallyRunProcessorTask.java  | 2 +-
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fb65cf12/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index 0380e6e..04e3f60 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@ -68,10 +68,11 @@ public class ContinuallyRunConnectableTask implements 
Callable<Boolean> {
         // 4. There is a connection for each relationship.
         final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
         boolean flowFilesQueued = true;
+        boolean relationshipAvailable = true;
         final boolean shouldRun = (connectable.getYieldExpiration() < 
System.currentTimeMillis())
                 && (triggerWhenEmpty || (flowFilesQueued = 
Connectables.flowFilesQueued(connectable)))
                 && (connectable.getConnectableType() != ConnectableType.FUNNEL 
|| !connectable.getConnections().isEmpty())
-                && (connectable.getRelationships().isEmpty() || 
Connectables.anyRelationshipAvailable(connectable));
+            && (connectable.getRelationships().isEmpty() || 
(relationshipAvailable = Connectables.anyRelationshipAvailable(connectable)));
 
         if (shouldRun) {
             scheduleState.incrementActiveThreadCount();
@@ -100,9 +101,9 @@ public class ContinuallyRunConnectableTask implements 
Callable<Boolean> {
 
                 scheduleState.decrementActiveThreadCount();
             }
-        } else if (!flowFilesQueued) {
-            // FlowFiles must be queued in order to run but there are none 
queued;
-            // yield for just a bit.
+        } else if (!flowFilesQueued || !relationshipAvailable) {
+            // Either there are no FlowFiles queued, or the relationship is 
not available (i.e., backpressure is applied).
+            // We will yield for just a bit.
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb65cf12/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index dd12824..de91a6d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -109,7 +109,7 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
         if (numRelationships > 0) {
             final int requiredNumberOfAvailableRelationships = 
procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
             if 
(!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships))
 {
-                return false;
+                return true;
             }
         }
 

Reply via email to