This is an automated email from the ASF dual-hosted git repository. wujimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push: new 6d53ef2 [SCB-1226] there are problems when request rejected by thread pool queue full 6d53ef2 is described below commit 6d53ef26742f9031a4bda26b6b240e7715981f53 Author: wujimin <wuji...@huawei.com> AuthorDate: Mon Apr 1 15:58:35 2019 +0800 [SCB-1226] there are problems when request rejected by thread pool queue full --- .../common/rest/AbstractRestInvocation.java | 49 ++++++++++++---------- .../common/rest/TestAbstractRestInvocation.java | 32 +++++++++++++- .../org/apache/servicecomb/core/SCBEngine.java | 8 +++- .../transport/highway/HighwayServerInvoke.java | 6 ++- 4 files changed, 69 insertions(+), 26 deletions(-) diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java index f80309d..22cbc53 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java @@ -119,7 +119,7 @@ public abstract class AbstractRestInvocation { protected void scheduleInvocation() { try { createInvocation(); - } catch (IllegalStateException e) { + } catch (Throwable e) { sendFailResponse(e); return; } @@ -141,28 +141,33 @@ public abstract class AbstractRestInvocation { return; } - operationMeta.getExecutor().execute(() -> { - synchronized (this.requestEx) { - try { - if (isInQueueTimeout()) { - throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request."); - } - if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) { - // already timeout - // in this time, request maybe recycled and reused by web container, do not use requestEx - LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.", - operationMeta.getHttpMethod(), - operationMeta.getMicroserviceQualifiedName()); - return; + try { + operationMeta.getExecutor().execute(() -> { + synchronized (this.requestEx) { + try { + if (isInQueueTimeout()) { + throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request."); + } + if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) { + // already timeout + // in this time, request maybe recycled and reused by web container, do not use requestEx + LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.", + operationMeta.getHttpMethod(), + operationMeta.getMicroserviceQualifiedName()); + return; + } + + runOnExecutor(); + } catch (Throwable e) { + LOGGER.error("rest server onRequest error", e); + sendFailResponse(e); } - - runOnExecutor(); - } catch (Throwable e) { - LOGGER.error("rest server onRequest error", e); - sendFailResponse(e); } - } - }); + }); + } catch (Throwable e) { + LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName()); + sendFailResponse(e); + } } private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) { @@ -176,7 +181,7 @@ public abstract class AbstractRestInvocation { produceProcessor = ProduceProcessorManager.JSON_PROCESSOR; sendResponse(response); }); - } catch (Exception e) { + } catch (Throwable e) { LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e); qpsFlowControlReject.value = true; sendFailResponse(e); diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java index 7e9c9ac..4e82258 100644 --- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java +++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import javax.ws.rs.core.Response.Status; import javax.xml.ws.Holder; @@ -885,17 +886,44 @@ public class TestAbstractRestInvocation { protected void runOnExecutor() { throw new RuntimeExceptionWithoutStackTrace("run on executor"); } + }; + restInvocation.requestEx = requestEx; + restInvocation.restOperationMeta = restOperation; + + // will not throw exception + restInvocation.scheduleInvocation(); + } + + @Test + public void threadPoolReject(@Mocked OperationMeta operationMeta) { + RejectedExecutionException rejectedExecutionException = new RejectedExecutionException("reject"); + Executor executor = (task) -> { + throw rejectedExecutionException; + }; + new Expectations() { + { + restOperation.getOperationMeta(); + result = operationMeta; + operationMeta.getExecutor(); + result = executor; + } + }; + + Holder<Throwable> holder = new Holder<>(); + requestEx = new AbstractHttpServletRequestForTest(); + restInvocation = new AbstractRestInvocationForTest() { @Override public void sendFailResponse(Throwable throwable) { - throw (Error) throwable; + holder.value = throwable; } }; restInvocation.requestEx = requestEx; restInvocation.restOperationMeta = restOperation; - // will not throw exception restInvocation.scheduleInvocation(); + + Assert.assertSame(rejectedExecutionException, holder.value); } @Test diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java index 2aabd84..3f0b232 100644 --- a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java +++ b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java @@ -298,10 +298,16 @@ public class SCBEngine { } private void validAllInvocationFinished() throws InterruptedException { + long start = System.currentTimeMillis(); while (true) { - if (invocationFinishedCounter.get() == invocationStartedCounter.get()) { + long remaining = invocationStartedCounter.get() - invocationFinishedCounter.get(); + if (remaining == 0) { return; } + + if (System.currentTimeMillis() - start > TimeUnit.SECONDS.toMillis(30)) { + LOGGER.error("wait for all requests timeout, abandon waiting, remaining requests: {}.", remaining); + } TimeUnit.SECONDS.sleep(1); } } diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java index 0c49723..727eaef 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java @@ -18,6 +18,7 @@ package org.apache.servicecomb.transport.highway; import java.util.Map; +import java.util.concurrent.RejectedExecutionException; import javax.ws.rs.core.Response.Status; import javax.xml.ws.Holder; @@ -186,7 +187,10 @@ public class HighwayServerInvoke { } operationMeta.getExecutor().execute(this::runInExecutor); - } catch (IllegalStateException e) { + } catch (Throwable e) { + if (e instanceof RejectedExecutionException) { + LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName()); + } sendResponse(header.getContext(), Response.providerFailResp(e)); } }