This is an automated email from the ASF dual-hosted git repository. wujimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 09350a6b4e58d00cb8eeb3ffe85cbf7468a5b639 Author: zhengyangyong <yangyong.zh...@huawei.com> AuthorDate: Tue May 15 19:45:26 2018 +0800 SCB-548 refactor for delete ShutdownHandler Signed-off-by: zhengyangyong <yangyong.zh...@huawei.com> --- .../servicecomb/core/CseApplicationListener.java | 149 +--------- .../org/apache/servicecomb/core/SCBEngine.java | 305 +++++++++++++++++++++ .../org/apache/servicecomb/core/SCBStatus.java | 32 +++ .../core/handler/AbstractHandlerManager.java | 2 - .../core/handler/ShutdownHookHandler.java | 99 ------- .../provider/consumer/ReferenceConfigUtils.java | 30 +- .../core/TestCseApplicationListener.java | 248 ----------------- .../core/handler/TestShutdownHookHandler.java | 141 ---------- .../core/provider/consumer/TestInvokerUtils.java | 22 +- .../consumer/TestReferenceConfigUtils.java | 9 +- demo/perf/src/main/resources/microservice.yaml | 4 +- .../servicecomb/foundation/vertx/VertxUtils.java | 39 ++- .../tests/SpringMvcSpringIntegrationTest.java | 11 +- .../tests/RawSpringMvcIntegrationTest.java | 11 +- ...SimplifiedMappingAnnotationIntegrationTest.java | 11 +- ...SimplifiedMappingAnnotationIntegrationTest.java | 11 +- .../apache/servicecomb/provider/pojo/Invoker.java | 5 +- .../servicecomb/provider/pojo/TestInvoker.java | 10 +- .../springmvc/reference/CseClientHttpRequest.java | 4 + .../reference/TestCseClientHttpRequest.java | 14 +- .../async/CseAsyncClientHttpRequestTest.java | 21 +- .../client/http/WebsocketUtils.java | 5 - 22 files changed, 451 insertions(+), 732 deletions(-) diff --git a/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java b/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java index 5d40780..3658687 100644 --- a/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java +++ b/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java @@ -17,61 +17,23 @@ package org.apache.servicecomb.core; -import java.util.Collection; - -import javax.inject.Inject; - -import org.apache.servicecomb.core.BootListener.BootEvent; -import org.apache.servicecomb.core.BootListener.EventType; import org.apache.servicecomb.core.definition.loader.SchemaListenerManager; -import org.apache.servicecomb.core.endpoint.AbstractEndpointsCache; -import org.apache.servicecomb.core.handler.HandlerConfigUtils; -import org.apache.servicecomb.core.handler.ShutdownHookHandler; import org.apache.servicecomb.core.provider.consumer.ConsumerProviderManager; -import org.apache.servicecomb.core.provider.consumer.ReferenceConfigUtils; import org.apache.servicecomb.core.provider.producer.ProducerProviderManager; import org.apache.servicecomb.core.transport.TransportManager; -import org.apache.servicecomb.foundation.common.event.EventManager; import org.apache.servicecomb.foundation.common.utils.BeanUtils; -import org.apache.servicecomb.foundation.common.utils.FortifyUtils; -import org.apache.servicecomb.foundation.vertx.VertxUtils; import org.apache.servicecomb.serviceregistry.RegistryUtils; -import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceRegisterTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.core.Ordered; -import org.springframework.util.StringUtils; - -import com.google.common.eventbus.Subscribe; public class CseApplicationListener implements ApplicationListener<ApplicationEvent>, Ordered, ApplicationContextAware { - private static final Logger LOGGER = LoggerFactory.getLogger(CseApplicationListener.class); - - private static boolean isInit = false; - - @Inject - private ProducerProviderManager producerProviderManager; - - @Inject - private ConsumerProviderManager consumerProviderManager; - - @Inject - private TransportManager transportManager; - - @Inject - private SchemaListenerManager schemaListenerManager; - - private Collection<BootListener> bootListenerList; - private Class<?> initEventClass = ContextRefreshedEvent.class; private ApplicationContext applicationContext; @@ -93,113 +55,22 @@ public class CseApplicationListener return -1000; } - protected void triggerEvent(EventType eventType) { - BootEvent event = new BootEvent(); - event.setEventType(eventType); - - for (BootListener listener : bootListenerList) { - listener.onBootEvent(event); - } - } - @Override public void onApplicationEvent(ApplicationEvent event) { if (initEventClass.isInstance(event)) { - //TODO to load when webapplication context is used for discovery client, need to check if can use the order and undo this change with proper fix. - if (!isInit) { - try { - bootListenerList = applicationContext.getBeansOfType(BootListener.class).values(); - - AbstractEndpointsCache.init(RegistryUtils.getInstanceCacheManager(), transportManager); - - triggerEvent(EventType.BEFORE_HANDLER); - HandlerConfigUtils.init(); - triggerEvent(EventType.AFTER_HANDLER); - - triggerEvent(EventType.BEFORE_PRODUCER_PROVIDER); - producerProviderManager.init(); - triggerEvent(EventType.AFTER_PRODUCER_PROVIDER); - - triggerEvent(EventType.BEFORE_CONSUMER_PROVIDER); - consumerProviderManager.init(); - triggerEvent(EventType.AFTER_CONSUMER_PROVIDER); - - triggerEvent(EventType.BEFORE_TRANSPORT); - transportManager.init(); - triggerEvent(EventType.AFTER_TRANSPORT); - - schemaListenerManager.notifySchemaListener(); - - triggerEvent(EventType.BEFORE_REGISTRY); - - triggerAfterRegistryEvent(); - - RegistryUtils.run(); - - // 当程序退出时,进行相关清理,注意:kill -9 {pid}下无效 - if (applicationContext instanceof AbstractApplicationContext) { - ((AbstractApplicationContext) applicationContext).registerShutdownHook(); - } - isInit = true; - } catch (Exception e) { - LOGGER.error("cse init failed, {}", FortifyUtils.getErrorInfo(e)); - } + if (applicationContext instanceof AbstractApplicationContext) { + ((AbstractApplicationContext) applicationContext).registerShutdownHook(); } - } else if (event instanceof ContextClosedEvent) { - LOGGER.warn("cse is closing now..."); - gracefullyShutdown(); - isInit = false; - } - } - private void gracefullyShutdown() { - //Step 1: notify all component stop invoke via BEFORE_CLOSE Event - triggerEvent(EventType.BEFORE_CLOSE); - - //Step 2: Unregister microservice instance from Service Center and close vertx - RegistryUtils.destroy(); - VertxUtils.closeVertxByName("registry"); + if (SCBEngine.getInstance().getBootListenerList() == null) { + SCBEngine.getInstance().setProducerProviderManager(applicationContext.getBean(ProducerProviderManager.class)); + SCBEngine.getInstance().setConsumerProviderManager(applicationContext.getBean(ConsumerProviderManager.class)); + SCBEngine.getInstance().setTransportManager(applicationContext.getBean(TransportManager.class)); + SCBEngine.getInstance().setSchemaListenerManager(applicationContext.getBean(SchemaListenerManager.class)); + SCBEngine.getInstance().setBootListenerList(applicationContext.getBeansOfType(BootListener.class).values()); + } - //Step 3: wait all invocation finished - try { - ShutdownHookHandler.INSTANCE.ALL_INVOCATION_FINISHED.acquire(); - LOGGER.warn("all invocation finished"); - } catch (InterruptedException e) { - LOGGER.error("invocation finished semaphore interrupted", e); + SCBEngine.getInstance().init(); } - - //Step 4: Stop vertx to prevent blocking exit - VertxUtils.closeVertxByName("config-center"); - VertxUtils.closeVertxByName("transport"); - - //Step 5: notify all component do clean works via AFTER_CLOSE Event - triggerEvent(EventType.AFTER_CLOSE); - } - - - /** - * <p>As the process of instance registry is asynchronous, the {@code AFTER_REGISTRY} - * event should not be sent immediately after {@link RegistryUtils#run()} is invoked. - * When the instance registry succeeds, {@link MicroserviceInstanceRegisterTask} will be posted in {@link EventManager}, - * register a subscriber to watch this event and send {@code AFTER_REGISTRY}.</p> - * - * <p>This method should be called before {@link RegistryUtils#run()} to avoid that the registry process is too quick - * that the event is not watched by this subscriber.</p> - * - * <p>Check if {@code InstanceId} is null to judge whether the instance registry has succeeded.</p> - */ - private void triggerAfterRegistryEvent() { - EventManager.register(new Object() { - @Subscribe - public void afterRegistryInstance(MicroserviceInstanceRegisterTask microserviceInstanceRegisterTask) { - LOGGER.info("receive MicroserviceInstanceRegisterTask event, check instance Id..."); - if (!StringUtils.isEmpty(RegistryUtils.getMicroserviceInstance().getInstanceId())) { - LOGGER.info("instance registry succeeds for the first time, will send AFTER_REGISTRY event."); - ReferenceConfigUtils.setReady(true); - triggerEvent(EventType.AFTER_REGISTRY); - EventManager.unregister(this); - } - } - }); } } diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java new file mode 100644 index 0000000..f1411e1 --- /dev/null +++ b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.core; + +import java.util.Collection; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.servicecomb.core.BootListener.BootEvent; +import org.apache.servicecomb.core.BootListener.EventType; +import org.apache.servicecomb.core.definition.loader.SchemaListenerManager; +import org.apache.servicecomb.core.endpoint.AbstractEndpointsCache; +import org.apache.servicecomb.core.event.InvocationFinishEvent; +import org.apache.servicecomb.core.event.InvocationStartEvent; +import org.apache.servicecomb.core.handler.HandlerConfigUtils; +import org.apache.servicecomb.core.provider.consumer.ConsumerProviderManager; +import org.apache.servicecomb.core.provider.producer.ProducerProviderManager; +import org.apache.servicecomb.core.transport.TransportManager; +import org.apache.servicecomb.foundation.common.event.EventManager; +import org.apache.servicecomb.foundation.vertx.VertxUtils; +import org.apache.servicecomb.serviceregistry.RegistryUtils; +import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceRegisterTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + +// TODO: should not depend on spring, that will make integration more flexible +public class SCBEngine { + private static final Logger LOGGER = LoggerFactory.getLogger(SCBEngine.class); + + private ProducerProviderManager producerProviderManager; + + private ConsumerProviderManager consumerProviderManager; + + private TransportManager transportManager; + + private SchemaListenerManager schemaListenerManager; + + private Collection<BootListener> bootListenerList; + + private final AtomicLong invocationStartedCounter = new AtomicLong(); + + private final AtomicLong invocationFinishedCounter = new AtomicLong(); + + private final Semaphore allInvocationFinished = new Semaphore(1); + + private volatile SCBStatus status = SCBStatus.DOWN; + + public void setStatus(SCBStatus status) { + this.status = status; + } + + private EventBus eventBus = EventManager.getEventBus(); + + private static final SCBEngine INSTANCE = new SCBEngine(); + + public static SCBEngine getInstance() { + return INSTANCE; + } + + public void setProducerProviderManager( + ProducerProviderManager producerProviderManager) { + this.producerProviderManager = producerProviderManager; + } + + public void setConsumerProviderManager( + ConsumerProviderManager consumerProviderManager) { + this.consumerProviderManager = consumerProviderManager; + } + + public TransportManager getTransportManager() { + return transportManager; + } + + public void setTransportManager(TransportManager transportManager) { + this.transportManager = transportManager; + } + + public void setSchemaListenerManager( + SchemaListenerManager schemaListenerManager) { + this.schemaListenerManager = schemaListenerManager; + } + + public Collection<BootListener> getBootListenerList() { + return bootListenerList; + } + + public void setBootListenerList(Collection<BootListener> bootListenerList) { + this.bootListenerList = bootListenerList; + } + + protected void triggerEvent(EventType eventType) { + BootEvent event = new BootEvent(); + event.setEventType(eventType); + + for (BootListener listener : bootListenerList) { + listener.onBootEvent(event); + } + } + + protected void safeTriggerEvent(EventType eventType) { + BootEvent event = new BootEvent(); + event.setEventType(eventType); + + for (BootListener listener : bootListenerList) { + try { + listener.onBootEvent(event); + LOGGER.error("BootListener {} succeed to process {}.", listener.getClass().getName(), eventType); + } catch (Throwable e) { + LOGGER.error("BootListener {} failed to process {}.", listener.getClass().getName(), eventType, e); + } + } + } + + /** + * <p>As the process of instance registry is asynchronous, the {@code AFTER_REGISTRY} + * event should not be sent immediately after {@link RegistryUtils#run()} is invoked. + * When the instance registry succeeds, {@link MicroserviceInstanceRegisterTask} will be posted in {@link EventManager}, + * register a subscriber to watch this event and send {@code AFTER_REGISTRY}.</p> + * + * <p>This method should be called before {@link RegistryUtils#run()} to avoid that the registry process is too quick + * that the event is not watched by this subscriber.</p> + * + * <p>Check if {@code InstanceId} is null to judge whether the instance registry has succeeded.</p> + */ + private void triggerAfterRegistryEvent() { + EventManager.register(new Object() { + @Subscribe + public void afterRegistryInstance(MicroserviceInstanceRegisterTask microserviceInstanceRegisterTask) { + LOGGER.info("receive MicroserviceInstanceRegisterTask event, check instance Id..."); + if (!StringUtils.isEmpty(RegistryUtils.getMicroserviceInstance().getInstanceId())) { + LOGGER.info("instance registry succeeds for the first time, will send AFTER_REGISTRY event."); + status = SCBStatus.UP; + triggerEvent(EventType.AFTER_REGISTRY); + EventManager.unregister(this); + } + } + }); + } + + @AllowConcurrentEvents + @Subscribe + public void onInvocationStart(InvocationStartEvent event) { + invocationStartedCounter.incrementAndGet(); + } + + @AllowConcurrentEvents + @Subscribe + public void onInvocationFinish(InvocationFinishEvent event) { + invocationFinishedCounter.incrementAndGet(); + if (validIsStopping()) { + validAllInvocationFinished(); + } + } + + public synchronized void init() { + if (validIsDown()) { + try { + doInit(); + status = SCBStatus.UP; + } catch (Exception e) { + uninit(); + status = SCBStatus.FAILED; + throw new IllegalStateException("ServiceComb init failed.", e); + } + } + } + + + private void doInit() throws Exception { + status = SCBStatus.STARTING; + + eventBus.register(this); + + allInvocationFinished.acquire(); + + AbstractEndpointsCache.init(RegistryUtils.getInstanceCacheManager(), transportManager); + + triggerEvent(EventType.BEFORE_HANDLER); + HandlerConfigUtils.init(); + triggerEvent(EventType.AFTER_HANDLER); + + triggerEvent(EventType.BEFORE_PRODUCER_PROVIDER); + producerProviderManager.init(); + triggerEvent(EventType.AFTER_PRODUCER_PROVIDER); + + triggerEvent(EventType.BEFORE_CONSUMER_PROVIDER); + consumerProviderManager.init(); + triggerEvent(EventType.AFTER_CONSUMER_PROVIDER); + + triggerEvent(EventType.BEFORE_TRANSPORT); + transportManager.init(); + triggerEvent(EventType.AFTER_TRANSPORT); + + schemaListenerManager.notifySchemaListener(); + + triggerEvent(EventType.BEFORE_REGISTRY); + + triggerAfterRegistryEvent(); + + RegistryUtils.run(); + + Runtime.getRuntime().addShutdownHook(new Thread(this::uninit)); + } + + /** + * not allow throw any exception + * even some step throw exception, must catch it and go on, otherwise shutdown process will be broken. + */ + public synchronized void uninit() { + if (validIsUp()) { + LOGGER.info("ServiceComb is closing now..."); + try { + doUninit(); + status = SCBStatus.DOWN; + } catch (Exception e) { + status = SCBStatus.FAILED; + LOGGER.info("ServiceComb failed shutdown", e); + } + } + } + + private void doUninit() throws Exception { + //Step 1: notify all component stop invoke via BEFORE_CLOSE Event + safeTriggerEvent(EventType.BEFORE_CLOSE); + + status = SCBStatus.STOPPING; + + //Step 2: Unregister microservice instance from Service Center and close vertx + // Forbidden other consumers find me + RegistryUtils.destroy(); + VertxUtils.blockCloseVertxByName("registry"); + + //Step 3: wait all invocation finished + // forbit create new consumer invocation + validAllInvocationFinished(); + allInvocationFinished.acquire(); + + //Step 4: Stop vertx to prevent blocking exit + VertxUtils.blockCloseVertxByName("config-center"); + VertxUtils.blockCloseVertxByName("transport"); + + //Step 5: notify all component do clean works via AFTER_CLOSE Event + safeTriggerEvent(EventType.AFTER_CLOSE); + + //Step 6: Clean flags for re-init + eventBus.unregister(this); + allInvocationFinished.release(); + } + + private void validAllInvocationFinished() { + LOGGER.info("valid for all invocation finished, request count: {}, response count: {}.", + invocationStartedCounter.get(), + invocationFinishedCounter.get()); + if (invocationFinishedCounter.get() == invocationStartedCounter.get()) { + allInvocationFinished.release(); + } + } + + public void assertIsStopping() { + if (validIsStopping()) { + throw new IllegalStateException( + "shutting down in progress, it's better to implement " + BootListener.class.getName() + + " and stop ... in EventType.BEFORE_CLOSE."); + } + } + + private boolean validIsStopping() { + return SCBStatus.STOPPING.equals(status); + } + + public void assertIsUp() { + if (!SCBStatus.UP.equals(status)) { + throw new IllegalStateException("System is not ready for remote calls. " + + "When beans are making remote calls in initialization, it's better to " + + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY."); + } + } + + private boolean validIsUp() { + return SCBStatus.UP.equals(status); + } + + private boolean validIsDown() { + return SCBStatus.DOWN.equals(status); + } +} diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBStatus.java b/core/src/main/java/org/apache/servicecomb/core/SCBStatus.java new file mode 100644 index 0000000..62551eb --- /dev/null +++ b/core/src/main/java/org/apache/servicecomb/core/SCBStatus.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.core; + +//DOWN -> STARTING -> UP -> STOPPING -> DOWN +public enum SCBStatus { + //Chassis is Down + DOWN, + //Chassis is Starting (progressing) + STARTING, + //Chassis is Running + UP, + //Chassis is Stopping (progressing) + STOPPING, + //Chassis Init Failed + FAILED +} diff --git a/core/src/main/java/org/apache/servicecomb/core/handler/AbstractHandlerManager.java b/core/src/main/java/org/apache/servicecomb/core/handler/AbstractHandlerManager.java index 3b49bc1..20c72b4 100644 --- a/core/src/main/java/org/apache/servicecomb/core/handler/AbstractHandlerManager.java +++ b/core/src/main/java/org/apache/servicecomb/core/handler/AbstractHandlerManager.java @@ -86,8 +86,6 @@ public abstract class AbstractHandlerManager extends AbstractObjectManager<Strin List<Class<Handler>> chainClasses = convertToChainClass(chainDef); List<Handler> handlerList = new ArrayList<>(); - handlerList.add(ShutdownHookHandler.INSTANCE); - for (Class<Handler> cls : chainClasses) { try { handlerList.add(cls.newInstance()); diff --git a/core/src/main/java/org/apache/servicecomb/core/handler/ShutdownHookHandler.java b/core/src/main/java/org/apache/servicecomb/core/handler/ShutdownHookHandler.java deleted file mode 100644 index 4b284a4..0000000 --- a/core/src/main/java/org/apache/servicecomb/core/handler/ShutdownHookHandler.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.core.handler; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.servicecomb.core.Handler; -import org.apache.servicecomb.core.Invocation; -import org.apache.servicecomb.foundation.common.exceptions.ServiceCombException; -import org.apache.servicecomb.swagger.invocation.AsyncResponse; -import org.apache.servicecomb.swagger.invocation.Response; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * 实现调用链的优雅停止: 当调用链没有返回的时候,等待返回(由于Consumer存在超时,所以必定能够返回) - */ -public final class ShutdownHookHandler implements Handler, Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ShutdownHookHandler.class); - - public static final ShutdownHookHandler INSTANCE = new ShutdownHookHandler(); - - private final AtomicLong requestCounter = new AtomicLong(0); - - private final AtomicLong responseCounter = new AtomicLong(0); - - private volatile boolean shuttingDown = false; - - public final Semaphore ALL_INVOCATION_FINISHED = new Semaphore(1); - - private ShutdownHookHandler() { - try { - ALL_INVOCATION_FINISHED.acquire(); - } catch (InterruptedException e) { - throw new ServiceCombException("init invocation finished semaphore failed", e); - } - Runtime.getRuntime().addShutdownHook(new Thread(this)); - } - - public long getActiveCount() { - return requestCounter.get() - responseCounter.get(); - } - - @Override - public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { - if (shuttingDown) { - asyncResp.handle(Response.createFail(invocation.getInvocationType(), - "shutting down in progress")); - return; - } - - // TODO:统计功能应该独立出来,在链中统计,会有各种bug - // 下面的两次catch,可能会导致一次请求,对应2次应答 - requestCounter.incrementAndGet(); - try { - invocation.next(resp -> { - try { - asyncResp.handle(resp); - } finally { - responseCounter.incrementAndGet(); - validAllInvocationFinished(); - } - }); - } catch (Throwable e) { - responseCounter.incrementAndGet(); - validAllInvocationFinished(); - throw e; - } - } - - private synchronized void validAllInvocationFinished() { - if (shuttingDown && getActiveCount() <= 0) { - ALL_INVOCATION_FINISHED.release(); - } - } - - @Override - public void run() { - shuttingDown = true; - LOG.warn("handler chain is shutting down..."); - validAllInvocationFinished(); - } -} diff --git a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/ReferenceConfigUtils.java b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/ReferenceConfigUtils.java index fe8d254..b0bf2d2 100644 --- a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/ReferenceConfigUtils.java +++ b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/ReferenceConfigUtils.java @@ -17,38 +17,18 @@ package org.apache.servicecomb.core.provider.consumer; -import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.core.CseContext; -import org.springframework.stereotype.Component; +import org.apache.servicecomb.core.SCBEngine; -@Component public class ReferenceConfigUtils { - private static boolean ready; - - public static void setReady(boolean ready) { - ReferenceConfigUtils.ready = ready; - } - - private static void assertIsReady() { - if (!ready) { - throw new IllegalStateException("System is not ready for remote calls. " - + "When beans are making remote calls in initialization, it's better to " - + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY."); - } - } - public static ReferenceConfig getForInvoke(String microserviceName) { - assertIsReady(); - + SCBEngine.getInstance().assertIsUp(); return CseContext.getInstance().getConsumerProviderManager().getReferenceConfig(microserviceName); } - public static ReferenceConfig getForInvoke(String microserviceName, String microserviceVersion, - String transport) { - assertIsReady(); - + public static ReferenceConfig getForInvoke(String microserviceName, String microserviceVersion, String transport) { + SCBEngine.getInstance().assertIsUp(); return CseContext.getInstance().getConsumerProviderManager().createReferenceConfig(microserviceName, - microserviceVersion, - transport); + microserviceVersion, transport); } } diff --git a/core/src/test/java/org/apache/servicecomb/core/TestCseApplicationListener.java b/core/src/test/java/org/apache/servicecomb/core/TestCseApplicationListener.java deleted file mode 100644 index 43fd873..0000000 --- a/core/src/test/java/org/apache/servicecomb/core/TestCseApplicationListener.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.core; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.xml.ws.Holder; - -import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.servicecomb.core.BootListener.BootEvent; -import org.apache.servicecomb.core.BootListener.EventType; -import org.apache.servicecomb.core.definition.loader.SchemaListenerManager; -import org.apache.servicecomb.core.endpoint.AbstractEndpointsCache; -import org.apache.servicecomb.core.handler.ShutdownHookHandler; -import org.apache.servicecomb.core.provider.consumer.ConsumerProviderManager; -import org.apache.servicecomb.core.provider.consumer.ReferenceConfigUtils; -import org.apache.servicecomb.core.provider.producer.ProducerProviderManager; -import org.apache.servicecomb.core.transport.TransportManager; -import org.apache.servicecomb.foundation.common.event.EventManager; -import org.apache.servicecomb.foundation.common.utils.BeanUtils; -import org.apache.servicecomb.foundation.common.utils.ReflectUtils; -import org.apache.servicecomb.serviceregistry.RegistryUtils; -import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; -import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceRegisterTask; -import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; -import org.springframework.context.ApplicationContext; -import org.springframework.context.event.ContextClosedEvent; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.context.support.AbstractApplicationContext; - -import mockit.Deencapsulation; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; - -public class TestCseApplicationListener { - static ApplicationContext orgContext; - - @BeforeClass - public static void classSetup() { - orgContext = BeanUtils.getContext(); - } - - @AfterClass - public static void teardown() { - AbstractEndpointsCache.init(null, null); - BeanUtils.setContext(orgContext); - } - - @After - public void cleanup() { - Deencapsulation.setField(ReferenceConfigUtils.class, "ready", false); - } - - @Test - public void testCseApplicationListenerNormal(@Injectable ContextRefreshedEvent event, - @Injectable AbstractApplicationContext context, - @Injectable ProducerProviderManager producerProviderManager, - @Injectable ConsumerProviderManager consumerProviderManager, - @Injectable TransportManager transportManager, - @Mocked RegistryUtils ru) { - Map<String, BootListener> listeners = new HashMap<>(); - BootListener listener = Mockito.mock(BootListener.class); - listeners.put("test", listener); - - SchemaListenerManager schemaListenerManager = Mockito.mock(SchemaListenerManager.class); - MicroserviceInstance microserviceInstance = Mockito.mock(MicroserviceInstance.class); - - new Expectations() { - { - context.getBeansOfType(BootListener.class); - result = listeners; - } - }; - new Expectations(RegistryUtils.class) { - { - RegistryUtils.init(); - RegistryUtils.getInstanceCacheManager(); - RegistryUtils.run(); - RegistryUtils.getMicroserviceInstance(); - result = microserviceInstance; - } - }; - Mockito.when(microserviceInstance.getInstanceId()).thenReturn("testInstanceId"); - - CseApplicationListener cal = new CseApplicationListener(); - Deencapsulation.setField(cal, "schemaListenerManager", schemaListenerManager); - cal.setInitEventClass(ContextRefreshedEvent.class); - cal.setApplicationContext(context); - ReflectUtils.setField(cal, "producerProviderManager", producerProviderManager); - ReflectUtils.setField(cal, "consumerProviderManager", consumerProviderManager); - ReflectUtils.setField(cal, "transportManager", transportManager); - - cal.onApplicationEvent(event); - - EventManager.post(Mockito.mock(MicroserviceInstanceRegisterTask.class)); - verify(schemaListenerManager).notifySchemaListener(); - verify(listener, times(10)).onBootEvent(Mockito.any(BootEvent.class)); - } - - @Test - public void testCseApplicationListenerThrowException(@Injectable ContextRefreshedEvent event, - @Injectable AbstractApplicationContext context, - @Injectable BootListener listener, - @Injectable ProducerProviderManager producerProviderManager, - @Mocked RegistryUtils ru) { - Map<String, BootListener> listeners = new HashMap<>(); - listeners.put("test", listener); - - CseApplicationListener cal = new CseApplicationListener(); - cal.setApplicationContext(context); - ReflectUtils.setField(cal, "producerProviderManager", producerProviderManager); - cal.onApplicationEvent(event); - } - - @Test - public void testCseApplicationListenerParentNotnull(@Injectable ContextRefreshedEvent event, - @Injectable AbstractApplicationContext context, - @Injectable AbstractApplicationContext pContext, - @Mocked RegistryUtils ru) { - - CseApplicationListener cal = new CseApplicationListener(); - cal.setApplicationContext(context); - cal.onApplicationEvent(event); - } - - @Test - public void testCseApplicationListenerShutdown(@Mocked ApplicationContext context) throws IllegalAccessException { - Holder<Boolean> destroyHolder = new Holder<>(); - new MockUp<RegistryUtils>() { - @Mock - void destroy() { - destroyHolder.value = true; - } - }; - CseApplicationListener cal = new CseApplicationListener(); - ContextClosedEvent event = new ContextClosedEvent(context); - - ShutdownHookHandler.INSTANCE.ALL_INVOCATION_FINISHED.release(); - - List<EventType> eventTypes = new ArrayList<>(); - BootListener bootListener = e -> { - eventTypes.add(e.getEventType()); - }; - FieldUtils.writeField(cal, "bootListenerList", Arrays.asList(bootListener), true); - cal.onApplicationEvent(event); - - Assert.assertTrue(destroyHolder.value); - Assert.assertThat(eventTypes, Matchers.contains(EventType.BEFORE_CLOSE, EventType.AFTER_CLOSE)); - } - - @Test - public void testTriggerAfterRegistryEvent() { - CseApplicationListener cal = new CseApplicationListener(); - - Collection<BootListener> listeners = new ArrayList<>(1); - BootListener listener = Mockito.mock(BootListener.class); - listeners.add(listener); - Deencapsulation.setField(cal, "bootListenerList", listeners); - - MicroserviceInstance microserviceInstance = Mockito.mock(MicroserviceInstance.class); - new Expectations(RegistryUtils.class) { - { - RegistryUtils.getMicroserviceInstance(); - result = microserviceInstance; - } - }; - Mockito.when(microserviceInstance.getInstanceId()).thenReturn("testInstanceId"); - - Deencapsulation.invoke(cal, "triggerAfterRegistryEvent"); - - EventManager.post(Mockito.mock(MicroserviceInstanceRegisterTask.class)); - - Deencapsulation.invoke(ReferenceConfigUtils.class, "assertIsReady"); - - // AFTER_REGISTRY event should only be sent at the first time of registry success. - EventManager.post(Mockito.mock(MicroserviceInstanceRegisterTask.class)); - verify(listener, times(1)).onBootEvent(Mockito.any(BootEvent.class)); - } - - @Test - public void testTriggerAfterRegistryEventOnInstanceIdIsNull() { - CseApplicationListener cal = new CseApplicationListener(); - - Collection<BootListener> listeners = new ArrayList<>(1); - BootListener listener = Mockito.mock(BootListener.class); - listeners.add(listener); - Deencapsulation.setField(cal, "bootListenerList", listeners); - - MicroserviceInstance microserviceInstance = Mockito.mock(MicroserviceInstance.class); - new Expectations(RegistryUtils.class) { - { - RegistryUtils.getMicroserviceInstance(); - result = microserviceInstance; - } - }; - Mockito.when(microserviceInstance.getInstanceId()).thenReturn(null).thenReturn("testInstanceId"); - - Deencapsulation.invoke(cal, "triggerAfterRegistryEvent"); - - EventManager.post(Mockito.mock(MicroserviceInstanceRegisterTask.class)); - - try { - Deencapsulation.invoke(ReferenceConfigUtils.class, "assertIsReady"); - fail("an exception is expected."); - } catch (Exception e) { - Assert.assertEquals(IllegalStateException.class, e.getClass()); - } - verify(listener, times(0)).onBootEvent(Mockito.any(BootEvent.class)); - - // AFTER_REGISTRY event should only be sent at the first time of registry success. - EventManager.post(Mockito.mock(MicroserviceInstanceRegisterTask.class)); - Deencapsulation.invoke(ReferenceConfigUtils.class, "assertIsReady"); - verify(listener, times(1)).onBootEvent(Mockito.any(BootEvent.class)); - } -} diff --git a/core/src/test/java/org/apache/servicecomb/core/handler/TestShutdownHookHandler.java b/core/src/test/java/org/apache/servicecomb/core/handler/TestShutdownHookHandler.java deleted file mode 100644 index 00b767a..0000000 --- a/core/src/test/java/org/apache/servicecomb/core/handler/TestShutdownHookHandler.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.core.handler; - -import java.lang.reflect.Field; -import java.util.concurrent.atomic.AtomicLong; - -import javax.xml.ws.Holder; - -import org.apache.servicecomb.core.Invocation; -import org.apache.servicecomb.swagger.invocation.AsyncResponse; -import org.apache.servicecomb.swagger.invocation.InvocationType; -import org.apache.servicecomb.swagger.invocation.Response; -import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; -import org.apache.servicecomb.swagger.invocation.exception.InvocationException; -import org.junit.Assert; -import org.junit.Test; -import org.springframework.util.ReflectionUtils; - -import mockit.Deencapsulation; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; - -public class TestShutdownHookHandler { - static Field requestCountField = ReflectionUtils.findField(ShutdownHookHandler.class, "requestCounter"); - - static { - requestCountField.setAccessible(true); - } - - static AtomicLong requestCounter = - (AtomicLong) ReflectionUtils.getField(requestCountField, ShutdownHookHandler.INSTANCE); - - @Test - public void testShutdownHookHandlerCount(@Mocked Response response) throws Exception { - Deencapsulation.setField(ShutdownHookHandler.INSTANCE, "shuttingDown", false); - - ShutdownHookHandler handler = ShutdownHookHandler.INSTANCE; - Assert.assertEquals(0, handler.getActiveCount()); - - // no reply - Invocation invocation = new MockUp<Invocation>() { - @Mock - public void next(AsyncResponse asyncResp) throws Exception { - } - }.getMockInstance(); - handler.handle(invocation, asyncResp -> { - }); - Assert.assertEquals(1, requestCounter.get()); - Assert.assertEquals(1, handler.getActiveCount()); - - // normal - invocation = new MockUp<Invocation>() { - @Mock - public void next(AsyncResponse asyncResp) throws Exception { - asyncResp.handle(response); - } - }.getMockInstance(); - handler.handle(invocation, asyncResp -> { - }); - Assert.assertEquals(2, requestCounter.get()); - Assert.assertEquals(1, handler.getActiveCount()); - - // next exception - invocation = new MockUp<Invocation>() { - @Mock - public void next(AsyncResponse asyncResp) throws Exception { - throw new Error(); - } - }.getMockInstance(); - try { - handler.handle(invocation, asyncResp -> { - }); - Assert.assertFalse(true); - } catch (Throwable e) { - Assert.assertEquals(3, requestCounter.get()); - Assert.assertEquals(1, handler.getActiveCount()); - } - - AtomicLong responseCounter = Deencapsulation.getField(ShutdownHookHandler.INSTANCE, "responseCounter"); - responseCounter.incrementAndGet(); - Assert.assertEquals(0, handler.getActiveCount()); - - // reply exception - // TODO: should be fixed - // try { - // handler.handle(invocation, asyncResp -> { - // throw new Error(); - // }); - // - // Assert.assertFalse(true); - // } catch (Throwable e) { - // Assert.assertEquals(3, requestCounter.get()); - // Assert.assertEquals(1, handler.getActiveCount()); - // } - } - - @Test - public void testShutdownHookHandlerReject() throws Exception { - Deencapsulation.setField(ShutdownHookHandler.INSTANCE, "shuttingDown", true); - Holder<InvocationType> typeHolder = new Holder<>(InvocationType.PRODUCER); - Invocation invocation = new MockUp<Invocation>() { - @Mock - public InvocationType getInvocationType() { - return typeHolder.value; - } - }.getMockInstance(); - - ShutdownHookHandler handler = ShutdownHookHandler.INSTANCE; - handler.handle(invocation, asyncResp -> { - InvocationException e = asyncResp.getResult(); - Assert.assertEquals(((CommonExceptionData) e.getErrorData()).getMessage(), - "shutting down in progress"); - Assert.assertEquals(e.getStatusCode(), 590); - }); - - typeHolder.value = InvocationType.CONSUMER; - handler.handle(invocation, asyncResp -> { - InvocationException e = asyncResp.getResult(); - Assert.assertEquals(((CommonExceptionData) e.getErrorData()).getMessage(), - "shutting down in progress"); - Assert.assertEquals(e.getStatusCode(), 490); - }); - } -} diff --git a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java index d975e65..16adde7 100644 --- a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java +++ b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java @@ -22,6 +22,8 @@ import javax.xml.ws.Holder; import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.core.CseContext; import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.SCBEngine; +import org.apache.servicecomb.core.SCBStatus; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.core.definition.SchemaMeta; import org.apache.servicecomb.core.invocation.InvocationFactory; @@ -43,13 +45,13 @@ import mockit.Mocked; public class TestInvokerUtils { @Test - public void testSyncInvokeInvocationWithException() throws InterruptedException { + public void testSyncInvokeInvocationWithException() { Invocation invocation = Mockito.mock(Invocation.class); Response response = Mockito.mock(Response.class); new MockUp<SyncResponseExecutor>() { @Mock - public Response waitResponse() throws InterruptedException { + public Response waitResponse() { return Mockito.mock(Response.class); } }; @@ -80,7 +82,6 @@ public class TestInvokerUtils { } }; - Holder<InvocationContext> holder = new Holder<>(); InvokerUtils.reactiveInvoke(invocation, ar -> { holder.value = ContextUtils.getInvocationContext(); @@ -105,15 +106,16 @@ public class TestInvokerUtils { @Test public void tetSyncInvokeNotReady() { - ReferenceConfigUtils.setReady(false); + + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); try { InvokerUtils.syncInvoke("ms", "schemaId", "opName", null); Assert.fail("must throw exception"); } catch (IllegalStateException e) { Assert.assertEquals("System is not ready for remote calls. " - + "When beans are making remote calls in initialization, it's better to " - + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY.", + + "When beans are making remote calls in initialization, it's better to " + + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY.", e.getMessage()); } @@ -122,8 +124,8 @@ public class TestInvokerUtils { Assert.fail("must throw exception"); } catch (IllegalStateException e) { Assert.assertEquals("System is not ready for remote calls. " - + "When beans are making remote calls in initialization, it's better to " - + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY.", + + "When beans are making remote calls in initialization, it's better to " + + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY.", e.getMessage()); } } @@ -131,7 +133,9 @@ public class TestInvokerUtils { @Test public void tetSyncInvokeReady(@Injectable ConsumerProviderManager consumerProviderManager, @Injectable Invocation invocation) { - ReferenceConfigUtils.setReady(true); + + SCBEngine.getInstance().setStatus(SCBStatus.UP); + CseContext.getInstance().setConsumerProviderManager(consumerProviderManager); new Expectations(InvocationFactory.class) { diff --git a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestReferenceConfigUtils.java b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestReferenceConfigUtils.java index fbe664c..b31ee58 100644 --- a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestReferenceConfigUtils.java +++ b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestReferenceConfigUtils.java @@ -18,6 +18,8 @@ package org.apache.servicecomb.core.provider.consumer; import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.core.CseContext; +import org.apache.servicecomb.core.SCBEngine; +import org.apache.servicecomb.core.SCBStatus; import org.junit.Assert; import org.junit.Test; @@ -30,7 +32,8 @@ public class TestReferenceConfigUtils { + "When beans are making remote calls in initialization, it's better to " + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY."; - ReferenceConfigUtils.setReady(false); + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); + try { ReferenceConfigUtils.getForInvoke("abc"); Assert.fail("must throw exception"); @@ -48,7 +51,9 @@ public class TestReferenceConfigUtils { @Test public void testReady(@Injectable ConsumerProviderManager consumerProviderManager) { - ReferenceConfigUtils.setReady(true); + + SCBEngine.getInstance().setStatus(SCBStatus.UP); + CseContext.getInstance().setConsumerProviderManager(consumerProviderManager); Assert.assertNotNull(ReferenceConfigUtils.getForInvoke("abc")); diff --git a/demo/perf/src/main/resources/microservice.yaml b/demo/perf/src/main/resources/microservice.yaml index e597667..254a85b 100644 --- a/demo/perf/src/main/resources/microservice.yaml +++ b/demo/perf/src/main/resources/microservice.yaml @@ -39,10 +39,10 @@ servicecomb: thread-count: 8 references: transport: rest -servicecomb: metrics: window_time: 1000 - +servicecomb.metrics.publisher.defaultLog.enabled: true + sync-count: 10 async-count: 20 sync: false diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java index d5e55f4..5d4709f 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.lang.management.ManagementFactory; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import javax.xml.ws.Holder; @@ -157,13 +158,6 @@ public final class VertxUtils { return vertxMap.get(name); } - public static void closeVertxByName(String name) { - Vertx vertx = vertxMap.remove(name); - if (vertx != null) { - vertx.close(); - } - } - public static <T> void runInContext(Context context, AsyncResultCallback<T> callback, T result, Throwable e) { if (context == Vertx.currentContext()) { complete(callback, result, e); @@ -205,4 +199,35 @@ public final class VertxUtils { byteBuf.getBytes(0, arr); return arr; } + + public static CompletableFuture<Void> closeVertxByName(String name) { + LOGGER.info("Closing vertx {}.", name); + CompletableFuture<Void> future = new CompletableFuture<>(); + Vertx vertx = vertxMap.remove(name); + if (vertx == null) { + LOGGER.info("Vertx {} not exist.", name); + future.complete(null); + return future; + } + + vertx.close(ar -> { + if (ar.succeeded()) { + LOGGER.info("Success to close vertx {}.", name); + future.complete(null); + return; + } + + future.completeExceptionally(ar.cause()); + }); + return future; + } + + public static void blockCloseVertxByName(String name) { + CompletableFuture<Void> future = closeVertxByName(name); + try { + future.get(); + } catch (Throwable e) { + LOGGER.error("Failed to close vertx {}.", name, e); + } + } } diff --git a/integration-tests/springmvc-tests/springmvc-tests-general-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringIntegrationTest.java b/integration-tests/springmvc-tests/springmvc-tests-general-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringIntegrationTest.java index c0caef6..fbbf181 100644 --- a/integration-tests/springmvc-tests/springmvc-tests-general-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringIntegrationTest.java +++ b/integration-tests/springmvc-tests/springmvc-tests-general-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringIntegrationTest.java @@ -17,29 +17,24 @@ package org.apache.servicecomb.demo.springmvc.tests; -import org.apache.servicecomb.core.handler.ShutdownHookHandler; +import org.apache.servicecomb.core.SCBEngine; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) public class SpringMvcSpringIntegrationTest extends SpringMvcIntegrationTestBase { - private static ConfigurableApplicationContext context; - @BeforeClass public static void init() { System.setProperty("cse.uploads.directory", "/tmp"); setUpLocalRegistry(); - context = SpringApplication.run(SpringMvcSpringMain.class); + SpringApplication.run(SpringMvcSpringMain.class); } @AfterClass public static void shutdown() { - //sim system.exit(0) - ShutdownHookHandler.INSTANCE.ALL_INVOCATION_FINISHED.release(); - context.close(); + SCBEngine.getInstance().uninit(); } } diff --git a/integration-tests/springmvc-tests/springmvc-tests-general/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcIntegrationTest.java b/integration-tests/springmvc-tests/springmvc-tests-general/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcIntegrationTest.java index c0fc7e3..7a49b20 100644 --- a/integration-tests/springmvc-tests/springmvc-tests-general/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcIntegrationTest.java +++ b/integration-tests/springmvc-tests/springmvc-tests-general/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcIntegrationTest.java @@ -17,12 +17,9 @@ package org.apache.servicecomb.demo.springmvc.tests; -import org.apache.servicecomb.core.CseApplicationListener; -import org.apache.servicecomb.core.handler.ShutdownHookHandler; -import org.apache.servicecomb.foundation.common.utils.BeanUtils; +import org.apache.servicecomb.core.SCBEngine; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.springframework.context.event.ContextClosedEvent; public class RawSpringMvcIntegrationTest extends SpringMvcIntegrationTestBase { @@ -35,10 +32,6 @@ public class RawSpringMvcIntegrationTest extends SpringMvcIntegrationTestBase { @AfterClass public static void shutdown() { - //sim system.exit(0) - ShutdownHookHandler.INSTANCE.ALL_INVOCATION_FINISHED.release(); - CseApplicationListener cal = BeanUtils.getBean("org.apache.servicecomb.core.CseApplicationListener"); - ContextClosedEvent event = new ContextClosedEvent(BeanUtils.getContext()); - cal.onApplicationEvent(event); + SCBEngine.getInstance().uninit(); } } diff --git a/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringSimplifiedMappingAnnotationIntegrationTest.java b/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringSimplifiedMappingAnnotationIntegrationTest.java index dfc0ca8..292f926 100644 --- a/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringSimplifiedMappingAnnotationIntegrationTest.java +++ b/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping-with-springboot/src/test/java/org/apache/servicecomb/demo/springmvc/tests/SpringMvcSpringSimplifiedMappingAnnotationIntegrationTest.java @@ -17,30 +17,25 @@ package org.apache.servicecomb.demo.springmvc.tests; -import org.apache.servicecomb.core.handler.ShutdownHookHandler; +import org.apache.servicecomb.core.SCBEngine; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) public class SpringMvcSpringSimplifiedMappingAnnotationIntegrationTest extends SpringMvcIntegrationTestBase { - private static ConfigurableApplicationContext context; - @BeforeClass public static void init() { System.setProperty("spring.profiles.active", "SimplifiedMapping"); System.setProperty("cse.uploads.directory", "/tmp"); setUpLocalRegistry(); - context = SpringApplication.run(SpringMvcSpringMain.class); + SpringApplication.run(SpringMvcSpringMain.class); } @AfterClass public static void shutdown() { - //sim system.exit(0) - ShutdownHookHandler.INSTANCE.ALL_INVOCATION_FINISHED.release(); - context.close(); + SCBEngine.getInstance().uninit(); } } diff --git a/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcSimplifiedMappingAnnotationIntegrationTest.java b/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcSimplifiedMappingAnnotationIntegrationTest.java index d6d2cfc..35cdbd8 100644 --- a/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcSimplifiedMappingAnnotationIntegrationTest.java +++ b/integration-tests/springmvc-tests/springmvc-tests-simplified-mapping/src/test/java/org/apache/servicecomb/demo/springmvc/tests/RawSpringMvcSimplifiedMappingAnnotationIntegrationTest.java @@ -17,12 +17,9 @@ package org.apache.servicecomb.demo.springmvc.tests; -import org.apache.servicecomb.core.CseApplicationListener; -import org.apache.servicecomb.core.handler.ShutdownHookHandler; -import org.apache.servicecomb.foundation.common.utils.BeanUtils; +import org.apache.servicecomb.core.SCBEngine; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.springframework.context.event.ContextClosedEvent; public class RawSpringMvcSimplifiedMappingAnnotationIntegrationTest extends SpringMvcIntegrationTestBase { @@ -36,10 +33,6 @@ public class RawSpringMvcSimplifiedMappingAnnotationIntegrationTest extends Spri @AfterClass public static void shutdown() { - //sim system.exit(0) - ShutdownHookHandler.INSTANCE.ALL_INVOCATION_FINISHED.release(); - CseApplicationListener cal = BeanUtils.getBean("org.apache.servicecomb.core.CseApplicationListener"); - ContextClosedEvent event = new ContextClosedEvent(BeanUtils.getContext()); - cal.onApplicationEvent(event); + SCBEngine.getInstance().uninit(); } } diff --git a/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/Invoker.java b/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/Invoker.java index 905d938..1dd25e2 100644 --- a/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/Invoker.java +++ b/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/Invoker.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.servicecomb.core.CseContext; import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.SCBEngine; import org.apache.servicecomb.core.definition.MicroserviceMeta; import org.apache.servicecomb.core.definition.SchemaMeta; import org.apache.servicecomb.core.invocation.InvocationFactory; @@ -84,7 +85,7 @@ public class Invoker implements InvocationHandler { } @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + public Object invoke(Object proxy, Method method, Object[] args) { if (swaggerConsumer == null) { synchronized (this) { if (swaggerConsumer == null) { @@ -93,6 +94,8 @@ public class Invoker implements InvocationHandler { } } + SCBEngine.getInstance().assertIsStopping(); + SwaggerConsumerOperation consumerOperation = swaggerConsumer.findOperation(method.getName()); Invocation invocation = InvocationFactory diff --git a/providers/provider-pojo/src/test/java/org/apache/servicecomb/provider/pojo/TestInvoker.java b/providers/provider-pojo/src/test/java/org/apache/servicecomb/provider/pojo/TestInvoker.java index 3264e66..715edcc 100644 --- a/providers/provider-pojo/src/test/java/org/apache/servicecomb/provider/pojo/TestInvoker.java +++ b/providers/provider-pojo/src/test/java/org/apache/servicecomb/provider/pojo/TestInvoker.java @@ -22,12 +22,13 @@ import java.util.concurrent.CompletableFuture; import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.core.CseContext; import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.SCBEngine; +import org.apache.servicecomb.core.SCBStatus; import org.apache.servicecomb.core.definition.MicroserviceMeta; import org.apache.servicecomb.core.definition.schema.ConsumerSchemaFactory; import org.apache.servicecomb.core.provider.consumer.ConsumerProviderManager; import org.apache.servicecomb.core.provider.consumer.InvokerUtils; import org.apache.servicecomb.core.provider.consumer.ReferenceConfig; -import org.apache.servicecomb.core.provider.consumer.ReferenceConfigUtils; import org.apache.servicecomb.swagger.engine.SwaggerConsumer; import org.apache.servicecomb.swagger.engine.SwaggerConsumerOperation; import org.apache.servicecomb.swagger.engine.bootstrap.BootstrapNormal; @@ -56,12 +57,12 @@ public class TestInvoker { @Before public void setup() { - ReferenceConfigUtils.setReady(true); + SCBEngine.getInstance().setStatus(SCBStatus.UP); } @After public void teardown() { - ReferenceConfigUtils.setReady(false); + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); } @Test @@ -70,7 +71,8 @@ public class TestInvoker { + "When beans are making remote calls in initialization, it's better to " + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY."; - ReferenceConfigUtils.setReady(false); + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); + Invoker invoker = new Invoker("test", "schemaId", IPerson.class); try { diff --git a/providers/provider-springmvc/src/main/java/org/apache/servicecomb/provider/springmvc/reference/CseClientHttpRequest.java b/providers/provider-springmvc/src/main/java/org/apache/servicecomb/provider/springmvc/reference/CseClientHttpRequest.java index 947469d..a64d794 100644 --- a/providers/provider-springmvc/src/main/java/org/apache/servicecomb/provider/springmvc/reference/CseClientHttpRequest.java +++ b/providers/provider-springmvc/src/main/java/org/apache/servicecomb/provider/springmvc/reference/CseClientHttpRequest.java @@ -30,6 +30,7 @@ import org.apache.servicecomb.common.rest.definition.RestOperationMeta; import org.apache.servicecomb.common.rest.locator.OperationLocator; import org.apache.servicecomb.common.rest.locator.ServicePathManager; import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.SCBEngine; import org.apache.servicecomb.core.definition.MicroserviceMeta; import org.apache.servicecomb.core.invocation.InvocationFactory; import org.apache.servicecomb.core.provider.consumer.InvokerUtils; @@ -148,6 +149,9 @@ public class CseClientHttpRequest implements ClientHttpRequest { protected RequestMeta createRequestMeta(String httpMetod, URI uri) { String microserviceName = uri.getAuthority(); + + SCBEngine.getInstance().assertIsStopping(); + ReferenceConfig referenceConfig = ReferenceConfigUtils.getForInvoke(microserviceName); MicroserviceMeta microserviceMeta = referenceConfig.getMicroserviceMeta(); diff --git a/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/TestCseClientHttpRequest.java b/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/TestCseClientHttpRequest.java index a1d7271..fe32d99 100644 --- a/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/TestCseClientHttpRequest.java +++ b/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/TestCseClientHttpRequest.java @@ -26,8 +26,9 @@ import org.apache.servicecomb.common.rest.RestEngineSchemaListener; import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.core.CseContext; import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.SCBEngine; +import org.apache.servicecomb.core.SCBStatus; import org.apache.servicecomb.core.definition.SchemaMeta; -import org.apache.servicecomb.core.provider.consumer.ReferenceConfigUtils; import org.apache.servicecomb.core.unittest.UnitTestMeta; import org.apache.servicecomb.serviceregistry.RegistryUtils; import org.apache.servicecomb.serviceregistry.ServiceRegistry; @@ -45,12 +46,12 @@ import org.springframework.web.bind.annotation.RequestMethod; public class TestCseClientHttpRequest { @Before public void setup() { - ReferenceConfigUtils.setReady(true); + SCBEngine.getInstance().setStatus(SCBStatus.UP); } @After public void teardown() { - ReferenceConfigUtils.setReady(false); + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); } @RequestMapping(path = "SpringmvcImpl") @@ -63,12 +64,13 @@ public class TestCseClientHttpRequest { } @Test - public void testNotReady() throws IOException { + public void testNotReady() { String exceptionMessage = "System is not ready for remote calls. " + "When beans are making remote calls in initialization, it's better to " + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY."; - ReferenceConfigUtils.setReady(false); + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); + CseClientHttpRequest client = new CseClientHttpRequest(URI.create("cse://app:test/"), HttpMethod.POST); @@ -81,7 +83,7 @@ public class TestCseClientHttpRequest { } @Test - public void testNormal() throws IOException { + public void testNormal() { ServiceRegistry serviceRegistry = ServiceRegistryFactory.createLocal(); serviceRegistry.init(); RegistryUtils.setServiceRegistry(serviceRegistry); diff --git a/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/async/CseAsyncClientHttpRequestTest.java b/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/async/CseAsyncClientHttpRequestTest.java index 35388cc..d24a5ea 100644 --- a/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/async/CseAsyncClientHttpRequestTest.java +++ b/providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/async/CseAsyncClientHttpRequestTest.java @@ -27,8 +27,9 @@ import org.apache.servicecomb.common.rest.RestEngineSchemaListener; import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.core.CseContext; import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.SCBEngine; +import org.apache.servicecomb.core.SCBStatus; import org.apache.servicecomb.core.definition.SchemaMeta; -import org.apache.servicecomb.core.provider.consumer.ReferenceConfigUtils; import org.apache.servicecomb.core.unittest.UnitTestMeta; import org.apache.servicecomb.provider.springmvc.reference.CseClientHttpResponse; import org.apache.servicecomb.serviceregistry.RegistryUtils; @@ -51,12 +52,12 @@ public class CseAsyncClientHttpRequestTest { @Before public void setup() { - ReferenceConfigUtils.setReady(true); + SCBEngine.getInstance().setStatus(SCBStatus.UP); } @After public void teardown() { - ReferenceConfigUtils.setReady(false); + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); } @RequestMapping(path = "CseAsyncClientHttpRequestTestSchema") @@ -73,7 +74,7 @@ public class CseAsyncClientHttpRequestTest { String exceptionMessage = "System is not ready for remote calls. " + "When beans are making remote calls in initialization, it's better to " + "implement " + BootListener.class.getName() + " and do it after EventType.AFTER_REGISTRY."; - ReferenceConfigUtils.setReady(false); + SCBEngine.getInstance().setStatus(SCBStatus.DOWN); CseAsyncClientHttpRequest clientHttpRequest = new CseAsyncClientHttpRequest(URI.create("cse://app:test/"), HttpMethod.POST); try { @@ -94,13 +95,15 @@ public class CseAsyncClientHttpRequestTest { .getSchemaListenerManager() .setSchemaListenerList(Arrays.asList(new RestEngineSchemaListener())); - SchemaMeta schemaMeta = meta.getOrCreateSchemaMeta(CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class); + SchemaMeta schemaMeta = meta + .getOrCreateSchemaMeta(CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class); CseContext.getInstance().getSchemaListenerManager().notifySchemaListener(schemaMeta); Holder<Invocation> holder = new Holder<>(); CseAsyncClientHttpRequest client = new CseAsyncClientHttpRequest(URI.create( - "cse://app:test/" + CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class.getSimpleName() + "/testbytes"), + "cse://app:test/" + CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class.getSimpleName() + + "/testbytes"), HttpMethod.POST) { @Override protected CompletableFuture<ClientHttpResponse> doAsyncInvoke(Invocation invocation) { @@ -125,7 +128,8 @@ public class CseAsyncClientHttpRequestTest { CseContext.getInstance() .getSchemaListenerManager() .setSchemaListenerList(Arrays.asList(new RestEngineSchemaListener())); - SchemaMeta schemaMeta = meta.getOrCreateSchemaMeta(CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class); + SchemaMeta schemaMeta = meta + .getOrCreateSchemaMeta(CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class); CseContext.getInstance().getSchemaListenerManager().notifySchemaListener(schemaMeta); Throwable error = new Error("failed"); @@ -133,7 +137,8 @@ public class CseAsyncClientHttpRequestTest { CseAsyncClientHttpRequest client = new CseAsyncClientHttpRequest(URI.create( - "cse://app:test/" + CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class.getSimpleName() + "/testbytes"), + "cse://app:test/" + CseAsyncClientHttpRequestTest.CseAsyncClientHttpRequestTestSchema.class.getSimpleName() + + "/testbytes"), HttpMethod.POST) { @Override protected CompletableFuture<ClientHttpResponse> doAsyncInvoke(Invocation invocation) { diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java index e262d8c..70625da 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java @@ -59,11 +59,6 @@ public final class WebsocketUtils { }); ws.closeHandler(v -> { onClose.handle(v); - try { - ws.close(); - } catch (Exception err) { - LOGGER.error("ws close error.", err); - } }); ws.handler(onMessage); }, -- To stop receiving notification emails like this one, please contact wuji...@apache.org.