Repository: nifi Updated Branches: refs/heads/master c7ff2fc5d -> 234ddb0fe
NIFI-5745: When determining if backpressure should be applied across nodes for load balancing, only consider if the local partition has reached the threshold limits instead of considering the size of the entire queue This closes #3108. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/234ddb0f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/234ddb0f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/234ddb0f Branch: refs/heads/master Commit: 234ddb0fe1a36ad947c340114058d82c777d791f Parents: c7ff2fc Author: Mark Payne <marka...@hotmail.com> Authored: Wed Oct 24 13:06:57 2018 -0400 Committer: Koji Kawamura <ijokaruma...@apache.org> Committed: Fri Oct 26 16:04:46 2018 +0900 ---------------------------------------------------------------------- .../nifi/controller/queue/LoadBalancedFlowFileQueue.java | 6 ++++++ .../apache/nifi/controller/queue/AbstractFlowFileQueue.java | 5 ++++- .../queue/clustered/SocketLoadBalancedFlowFileQueue.java | 5 +++++ .../queue/clustered/server/StandardLoadBalanceProtocol.java | 4 +++- 4 files changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java index f0eff27..b9f6951 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java @@ -66,4 +66,10 @@ public interface LoadBalancedFlowFileQueue extends FlowFileQueue { */ boolean isPropagateBackpressureAcrossNodes(); + /** + * Determines whether or not the local partition's size >= backpressure threshold + * + * @return <code>true</code> if the number of FlowFiles or total size of FlowFiles in the local partition alone meets or exceeds the backpressure threshold, <code>false</code> otherwise. + */ + boolean isLocalPartitionFull(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java index 5bf75a4..436b85d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java @@ -141,6 +141,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { @Override public boolean isFull() { + return isFull(size()); + } + + protected boolean isFull(final QueueSize queueSize) { final MaxQueueSize maxSize = getMaxQueueSize(); // Check if max size is set @@ -148,7 +152,6 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { return false; } - final QueueSize queueSize = size(); if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { return true; } http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 7b3a211..84731f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -601,6 +601,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); } + @Override + public boolean isLocalPartitionFull() { + return isFull(localPartition.size()); + } + /** * Determines which QueuePartition the given FlowFile belongs to. Must be called with partition read lock held. * http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index dda71de..f508d12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -248,13 +248,15 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { "not configured to allow for Load Balancing"); } + final LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = (LoadBalancedFlowFileQueue) flowFileQueue; + final int spaceCheck = dataIn.read(); if (spaceCheck < 0) { throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription); } if (spaceCheck == CHECK_SPACE) { - if (flowFileQueue.isFull()) { + if (loadBalancedFlowFileQueue.isLocalPartitionFull()) { logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", peerDescription, connectionId); out.write(QUEUE_FULL); out.flush();