This is an automated email from the ASF dual-hosted git repository. alopresto pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 57c7883 NIFI-7566: Avoid using Thread.sleep() to wait for Site-to-Site connection to be handled. Instead, use TimeUnit.timedWait and use Object.notifyAll when setting the beingServiced flag. This significantly reduces latency and improves throughput for small-batch site-to-site communications 57c7883 is described below commit 57c7883f647348aba181440950f01cf1d62846c6 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Fri Jun 19 13:19:17 2020 -0400 NIFI-7566: Avoid using Thread.sleep() to wait for Site-to-Site connection to be handled. Instead, use TimeUnit.timedWait and use Object.notifyAll when setting the beingServiced flag. This significantly reduces latency and improves throughput for small-batch site-to-site communications This closes #4353. Signed-off-by: Andy LoPresto <alopre...@apache.org> --- .../org/apache/nifi/remote/StandardPublicPort.java | 62 ++++++++++++++-------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java index 66cbc05..d1ee69f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java @@ -454,11 +454,31 @@ public class StandardPublicPort extends AbstractPort implements PublicPort { } public void setServiceBegin() { - this.beingServiced.set(true); + beingServiced.set(true); + synchronized (this) { + notifyAll(); + } } - public boolean isBeingServiced() { - return beingServiced.get(); + public boolean waitForService(final long duration, final TimeUnit timeUnit) throws InterruptedException { + final long latestWaitTime = System.nanoTime() + timeUnit.toNanos(duration); + + while (!beingServiced.get()) { + final long nanosToWait = latestWaitTime - System.nanoTime(); + if (nanosToWait <= 0) { + return false; + } + + if (isExpired()) { + return false; + } + + synchronized (this) { + TimeUnit.NANOSECONDS.timedWait(this, nanosToWait); + } + } + + return true; } public BlockingQueue<ProcessingResult> getResponseQueue() { @@ -515,17 +535,17 @@ public class StandardPublicPort extends AbstractPort implements PublicPort { // wait for the request to start getting serviced... and time out if it doesn't happen // before the request expires - while (!request.isBeingServiced()) { - if (request.isExpired()) { - // Remove expired request, so that it won't block new request to be offered. - this.requestQueue.remove(request); - throw new SocketTimeoutException("Read timed out"); - } else { - try { - Thread.sleep(100L); - } catch (final InterruptedException e) { + try { + while (!request.waitForService(100, TimeUnit.MILLISECONDS)) { + if (request.isExpired()) { + // Remove expired request, so that it won't block new request to be offered. + this.requestQueue.remove(request); + throw new SocketTimeoutException("Read timed out"); } } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ProcessException("Interrupted while waiting for site-to-site request to be serviced", ie); } // we've started to service the request. Now just wait until it's finished @@ -571,17 +591,17 @@ public class StandardPublicPort extends AbstractPort implements PublicPort { // wait for the request to start getting serviced... and time out if it doesn't happen // before the request expires - while (!request.isBeingServiced()) { - if (request.isExpired()) { - // Remove expired request, so that it won't block new request to be offered. - this.requestQueue.remove(request); - throw new SocketTimeoutException("Read timed out"); - } else { - try { - Thread.sleep(100L); - } catch (final InterruptedException e) { + try { + while (!request.waitForService(100, TimeUnit.MILLISECONDS)) { + if (request.isExpired()) { + // Remove expired request, so that it won't block new request to be offered. + this.requestQueue.remove(request); + throw new SocketTimeoutException("Read timed out"); } } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ProcessException("Interrupted while waiting for Site-to-Site request to be serviced", ie); } // we've started to service the request. Now just wait until it's finished