Repository: nifi Updated Branches: refs/heads/master ba2719836 -> fb65cf123
NIFI-1271: Yield funnels and ports for nifi.bored.yield.duration amount of time if backpressure is applied, as we do when there are no input FlowFiles. Adjusting logic for ContinuallyRunProcessorTask#call in determining if there is appropriate availability for processor relationships. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fb65cf12 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fb65cf12 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fb65cf12 Branch: refs/heads/master Commit: fb65cf1235d7a1176c58244bc4307f72de0e2a5d Parents: ba27198 Author: Mark Payne <marka...@hotmail.com> Authored: Tue Dec 8 07:53:36 2015 -0500 Committer: Aldrin Piri <ald...@apache.org> Committed: Tue Dec 8 10:54:29 2015 -0500 ---------------------------------------------------------------------- .../controller/tasks/ContinuallyRunConnectableTask.java | 9 +++++---- .../nifi/controller/tasks/ContinuallyRunProcessorTask.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fb65cf12/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index 0380e6e..04e3f60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@ -68,10 +68,11 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { // 4. There is a connection for each relationship. final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty(); boolean flowFilesQueued = true; + boolean relationshipAvailable = true; final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis()) && (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) && (connectable.getConnectableType() != ConnectableType.FUNNEL || !connectable.getConnections().isEmpty()) - && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable)); + && (connectable.getRelationships().isEmpty() || (relationshipAvailable = Connectables.anyRelationshipAvailable(connectable))); if (shouldRun) { scheduleState.incrementActiveThreadCount(); @@ -100,9 +101,9 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { scheduleState.decrementActiveThreadCount(); } - } else if (!flowFilesQueued) { - // FlowFiles must be queued in order to run but there are none queued; - // yield for just a bit. + } else if (!flowFilesQueued || !relationshipAvailable) { + // Either there are no FlowFiles queued, or the relationship is not available (i.e., backpressure is applied). + // We will yield for just a bit. return true; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fb65cf12/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index dd12824..de91a6d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -109,7 +109,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { if (numRelationships > 0) { final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships; if (!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) { - return false; + return true; } }