This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push: new d6fc0a6bf [SCB-2650][SCB-2290]support instance isolation and instance bulkhead (#3242) d6fc0a6bf is described below commit d6fc0a6bf28a28234fb6338ceafbd816c4c383b6 Author: liubao68 <bi...@qq.com> AuthorDate: Sun Jul 31 10:43:18 2022 +0800 [SCB-2650][SCB-2290]support instance isolation and instance bulkhead (#3242) --- .../src/main/resources/application.yml | 2 +- .../src/main/resources/application.yml | 2 +- .../main/resources/config/base/log4j.properties | 2 +- .../governance/GovernanceConfiguration.java | 12 +++ .../handler/InstanceBulkheadHandler.java | 88 +++++++++++++++ .../handler/ext/AbstractFailurePredictor.java | 7 +- .../governance/policy/CircuitBreakerPolicy.java | 18 +++- .../servicecomb/governance/policy/RetryPolicy.java | 21 ++-- .../properties/InstanceBulkheadProperties.java | 19 ++-- .../governance/AbstractFailurePredictorTest.java | 64 +++++++++++ .../governance/GovernancePropertiesTest.java | 2 +- .../governance/InstanceBulkheadHandlerTest.java | 119 +++++++++++++++++++++ governance/src/test/resources/application.yaml | 4 + ...r.java => ConsumerInstanceBulkheadHandler.java} | 68 +++--------- ....java => ConsumerInstanceIsolationHandler.java} | 72 +++---------- .../governance/ProviderGovernanceHandler.java | 11 +- .../src/main/resources/config/cse.handler.xml | 6 +- 17 files changed, 371 insertions(+), 146 deletions(-) diff --git a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml index 1eb345f43..c663b708b 100644 --- a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml +++ b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml @@ -37,7 +37,7 @@ servicecomb: handler: chain: Consumer: - default: governance-consumer,loadbalance + default: loadbalance Provider: default: governance-provider diff --git a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml index 7d4118f29..972f3efb2 100644 --- a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml +++ b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml @@ -33,6 +33,6 @@ servicecomb: handler: chain: Consumer: - default: governance-consumer,loadbalance + default: loadbalance Provider: default: governance-provider \ No newline at end of file diff --git a/foundations/foundation-common/src/main/resources/config/base/log4j.properties b/foundations/foundation-common/src/main/resources/config/base/log4j.properties index 83de4e475..0cdb3bfbd 100644 --- a/foundations/foundation-common/src/main/resources/config/base/log4j.properties +++ b/foundations/foundation-common/src/main/resources/config/base/log4j.properties @@ -25,7 +25,7 @@ log4j.logger.runLogger=INFO log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d [%p] %m %l%n +log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss,SSS/zzz}][%t][%p]%m %l%n log4j.appender.paas=org.apache.servicecomb.foundation.common.utils.RollingFileAppenderExt log4j.appender.paas.file=${paas.logs.dir}${paas.logs.file} diff --git a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java index 03eb52156..9307da436 100644 --- a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java +++ b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.servicecomb.governance.handler.BulkheadHandler; import org.apache.servicecomb.governance.handler.CircuitBreakerHandler; import org.apache.servicecomb.governance.handler.FaultInjectionHandler; +import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler; import org.apache.servicecomb.governance.handler.InstanceIsolationHandler; import org.apache.servicecomb.governance.handler.RateLimitingHandler; import org.apache.servicecomb.governance.handler.RetryHandler; @@ -38,6 +39,7 @@ import org.apache.servicecomb.governance.marker.operator.SuffixOperator; import org.apache.servicecomb.governance.properties.BulkheadProperties; import org.apache.servicecomb.governance.properties.CircuitBreakerProperties; import org.apache.servicecomb.governance.properties.FaultInjectionProperties; +import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties; import org.apache.servicecomb.governance.properties.InstanceIsolationProperties; import org.apache.servicecomb.governance.properties.MatchProperties; import org.apache.servicecomb.governance.properties.RateLimitProperties; @@ -58,6 +60,11 @@ public class GovernanceConfiguration { return new BulkheadProperties(); } + @Bean + public InstanceBulkheadProperties instanceBulkheadProperties() { + return new InstanceBulkheadProperties(); + } + @Bean public CircuitBreakerProperties circuitBreakerProperties() { return new CircuitBreakerProperties(); @@ -94,6 +101,11 @@ public class GovernanceConfiguration { return new BulkheadHandler(bulkheadProperties); } + @Bean + public InstanceBulkheadHandler instanceBulkheadHandler(InstanceBulkheadProperties instanceBulkheadProperties) { + return new InstanceBulkheadHandler(instanceBulkheadProperties); + } + @Bean public CircuitBreakerHandler circuitBreakerHandler(CircuitBreakerProperties circuitBreakerProperties, AbstractCircuitBreakerExtension circuitBreakerExtension, diff --git a/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java new file mode 100644 index 000000000..bdfe7d0b6 --- /dev/null +++ b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.governance.handler; + +import java.time.Duration; + +import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.governance.marker.GovernanceRequest; +import org.apache.servicecomb.governance.policy.BulkheadPolicy; +import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadRegistry; + +public class InstanceBulkheadHandler extends AbstractGovernanceHandler<Bulkhead, BulkheadPolicy> { + private static final Logger LOGGER = LoggerFactory.getLogger(InstanceBulkheadHandler.class); + + private final InstanceBulkheadProperties bulkheadProperties; + + public InstanceBulkheadHandler(InstanceBulkheadProperties bulkheadProperties) { + this.bulkheadProperties = bulkheadProperties; + } + + @Override + protected String createKey(GovernanceRequest governanceRequest, BulkheadPolicy policy) { + return InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY + + "." + governanceRequest.getServiceName() + + "." + policy.getName() + + "." + governanceRequest.getInstanceId(); + } + + @Override + protected void onConfigurationChanged(String key) { + if (key.startsWith(InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY)) { + for (String processorKey : processors.keySet()) { + if (processorKey.startsWith(key)) { + processors.remove(processorKey); + } + } + } + } + + @Override + public BulkheadPolicy matchPolicy(GovernanceRequest governanceRequest) { + if (StringUtils.isEmpty(governanceRequest.getServiceName()) || StringUtils.isEmpty( + governanceRequest.getInstanceId())) { + LOGGER.info("Instance bulkhead is not properly configured, service id or instance id is empty."); + return null; + } + return matchersManager.match(governanceRequest, bulkheadProperties.getParsedEntity()); + } + + @Override + public Bulkhead createProcessor(String key, GovernanceRequest governanceRequest, BulkheadPolicy policy) { + return getBulkhead(key, policy); + } + + private Bulkhead getBulkhead(String key, BulkheadPolicy policy) { + LOGGER.info("applying new policy {} for {}", key, policy.toString()); + + BulkheadConfig config = BulkheadConfig.custom() + .maxConcurrentCalls(policy.getMaxConcurrentCalls()) + .maxWaitDuration(Duration.parse(policy.getMaxWaitDuration())) + .build(); + + BulkheadRegistry registry = BulkheadRegistry.of(config); + + return registry.bulkhead(key, config); + } +} diff --git a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java index 8af8ae68f..e7fea24f9 100644 --- a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java +++ b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java @@ -38,12 +38,15 @@ public abstract class AbstractFailurePredictor implements FailurePredictor { } private static boolean statusCodeMatch(String status, String responseStatus) { - if (3 != status.length()) { + if (status == null) { + return false; + } + if (responseStatus.length() != status.length()) { return false; } char[] statusChar = status.toCharArray(); char[] responseChar = responseStatus.toCharArray(); - return IntStream.range(0, 3).noneMatch(i -> + return IntStream.range(0, statusChar.length).noneMatch(i -> statusChar[i] != responseChar[i] && statusChar[i] != 'x'); } } diff --git a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java index 9996d1b72..1d1bdfeec 100644 --- a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java +++ b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java @@ -17,8 +17,9 @@ package org.apache.servicecomb.governance.policy; import java.time.Duration; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.governance.utils.GovernanceUtils; @@ -48,6 +49,9 @@ public class CircuitBreakerPolicy extends AbstractPolicy { public static final String DEFAULT_FAILURE_RESPONSE_STATUS_503 = "503"; + public static final List<String> DEFAULT_STATUS_LIST = Arrays.asList(DEFAULT_FAILURE_RESPONSE_STATUS_502, + DEFAULT_FAILURE_RESPONSE_STATUS_503); + private float failureRateThreshold = DEFAULT_FAILURE_RATE_THRESHOLD; private float slowCallRateThreshold = DEFAULT_SLOW_CALL_RATE_THRESHOLD; @@ -65,7 +69,7 @@ public class CircuitBreakerPolicy extends AbstractPolicy { private String slidingWindowSize = DEFAULT_SLIDING_WINDOW_SIZE; //status code that need record as a failure - private List<String> recordFailureStatus = new ArrayList<>(); + private List<String> recordFailureStatus = DEFAULT_STATUS_LIST; //force close this circuit breaker. This parameter is not used by circuit breaker directly private boolean forceClosed = false; @@ -193,14 +197,17 @@ public class CircuitBreakerPolicy extends AbstractPolicy { public List<String> getRecordFailureStatus() { if (CollectionUtils.isEmpty(this.recordFailureStatus)) { - this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_502); - this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_503); + return DEFAULT_STATUS_LIST; } return this.recordFailureStatus; } public void setRecordFailureStatus(List<String> recordFailureStatus) { - this.recordFailureStatus = recordFailureStatus; + if (recordFailureStatus == null) { + return; + } + this.recordFailureStatus = recordFailureStatus.stream().filter(e -> !StringUtils.isEmpty(e)) + .collect(Collectors.toList()); } public boolean isForceClosed() { @@ -230,6 +237,7 @@ public class CircuitBreakerPolicy extends AbstractPolicy { ", minimumNumberOfCalls=" + minimumNumberOfCalls + ", slidingWindowType='" + slidingWindowType + '\'' + ", slidingWindowSize=" + slidingWindowSize + + ", recordFailureStatus=" + recordFailureStatus + '}'; } } diff --git a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java index 95be9194f..726121a2c 100644 --- a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java +++ b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java @@ -17,8 +17,9 @@ package org.apache.servicecomb.governance.policy; import java.time.Duration; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.springframework.util.CollectionUtils; @@ -27,12 +28,15 @@ public class RetryPolicy extends AbstractPolicy { public static final int DEFAULT_MAX_ATTEMPTS = 3; - public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(10); + public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(1); public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_502 = "502"; public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_503 = "503"; + public static final List<String> DEFAULT_STATUS_LIST = Arrays.asList(DEFAULT_RETRY_ON_RESPONSE_STATUS_502, + DEFAULT_RETRY_ON_RESPONSE_STATUS_503); + private static final Duration INITIAL_INTERVAL = Duration.ofMillis(1000); private static final float MULTIPLIER = 2; @@ -48,7 +52,7 @@ public class RetryPolicy extends AbstractPolicy { private String waitDuration = DEFAULT_WAIT_DURATION.toString(); //status code that need retry - private List<String> retryOnResponseStatus = new ArrayList<>(); + private List<String> retryOnResponseStatus = DEFAULT_STATUS_LIST; //retry strategy private String retryStrategy = DEFAULT_RETRY_STRATEGY; @@ -71,14 +75,17 @@ public class RetryPolicy extends AbstractPolicy { public List<String> getRetryOnResponseStatus() { if (CollectionUtils.isEmpty(retryOnResponseStatus)) { - this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_502); - this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_503); + return DEFAULT_STATUS_LIST; } return retryOnResponseStatus; } public void setRetryOnResponseStatus(List<String> retryOnResponseStatus) { - this.retryOnResponseStatus = retryOnResponseStatus; + if (retryOnResponseStatus == null) { + return; + } + this.retryOnResponseStatus = retryOnResponseStatus.stream().filter(e -> !StringUtils.isEmpty(e)) + .collect(Collectors.toList()); } public int getMaxAttempts() { @@ -90,7 +97,7 @@ public class RetryPolicy extends AbstractPolicy { } public String getWaitDuration() { - return Duration.parse(waitDuration).toMillis() < 10 ? DEFAULT_WAIT_DURATION.toString() : waitDuration; + return Duration.parse(waitDuration).toMillis() < 1 ? DEFAULT_WAIT_DURATION.toString() : waitDuration; } public void setWaitDuration(String waitDuration) { diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java similarity index 64% rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java rename to governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java index f5cfbf72a..edd4b6045 100644 --- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java +++ b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java @@ -15,16 +15,19 @@ * limitations under the License. */ -package org.apache.servicecomb.handler.governance; +package org.apache.servicecomb.governance.properties; -import org.apache.servicecomb.core.Handler; -import org.apache.servicecomb.core.Invocation; -import org.apache.servicecomb.swagger.invocation.AsyncResponse; +import org.apache.servicecomb.governance.policy.BulkheadPolicy; + +public class InstanceBulkheadProperties extends PolicyProperties<BulkheadPolicy> { + public static final String MATCH_INSTANCE_BULKHEAD_KEY = "servicecomb.instanceBulkhead"; + + public InstanceBulkheadProperties() { + super(MATCH_INSTANCE_BULKHEAD_KEY); + } -public class ConsumerGovernanceHandler implements Handler { - // an empty implementation, will add possible features in future. @Override - public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { - invocation.next(asyncResp); + public Class<BulkheadPolicy> getEntityClass() { + return BulkheadPolicy.class; } } diff --git a/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java new file mode 100644 index 000000000..9c84287ac --- /dev/null +++ b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.governance; + +import java.util.Arrays; +import java.util.List; + +import org.apache.servicecomb.governance.handler.ext.AbstractFailurePredictor; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class AbstractFailurePredictorTest { + class MyAbstractFailurePredictor extends AbstractFailurePredictor { + MyAbstractFailurePredictor() { + } + + @Override + protected String extractStatusCode(Object result) { + return (String) result; + } + + @Override + public boolean isFailedResult(Throwable e) { + return super.isFailedResult(e); + } + } + + @Test + public void testCodeMatch() { + AbstractFailurePredictor predictor = new MyAbstractFailurePredictor(); + List<String> statusList = Arrays.asList("500"); + Assertions.assertTrue(predictor.isFailedResult(statusList, "500")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "502")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "400")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "444")); + + statusList = Arrays.asList("5x0"); + Assertions.assertTrue(predictor.isFailedResult(statusList, "500")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "502")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "400")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "444")); + + statusList = Arrays.asList(null, "xx", "5x0"); + Assertions.assertTrue(predictor.isFailedResult(statusList, "500")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "502")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "400")); + Assertions.assertFalse(predictor.isFailedResult(statusList, "444")); + } +} diff --git a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java index bc943a070..e0c55964c 100644 --- a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java +++ b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java @@ -164,7 +164,7 @@ public class GovernancePropertiesTest { @Test public void test_all_bean_is_loaded() { - Assertions.assertEquals(6, propertiesList.size()); + Assertions.assertEquals(7, propertiesList.size()); } @Test diff --git a/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java new file mode 100644 index 000000000..2377aabc3 --- /dev/null +++ b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.governance; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler; +import org.apache.servicecomb.governance.marker.GovernanceRequest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; + +import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.BulkheadFullException; +import io.github.resilience4j.decorators.Decorators; +import io.github.resilience4j.decorators.Decorators.DecorateCheckedSupplier; + +@SpringBootTest +@ContextConfiguration(classes = {GovernanceConfiguration.class, MockConfiguration.class}) +public class InstanceBulkheadHandlerTest { + private InstanceBulkheadHandler instanceBulkheadHandler; + + @Autowired + public void setInstanceBulkheadHandler(InstanceBulkheadHandler instanceBulkheadHandler) { + this.instanceBulkheadHandler = instanceBulkheadHandler; + } + + @Test + public void test_instance_bulkhead_work() throws Throwable { + + // instance1 + DecorateCheckedSupplier<String> dsInstance1 = Decorators.ofCheckedSupplier(() -> "wake"); + + GovernanceRequest requestInstance1 = new GovernanceRequest(); + requestInstance1.setInstanceId("instance01"); + requestInstance1.setServiceName("service01"); + requestInstance1.setUri("/test"); + + Bulkhead bulkheadInstance1 = instanceBulkheadHandler.getActuator(requestInstance1); + dsInstance1.withBulkhead(bulkheadInstance1); + + // instance2 + DecorateCheckedSupplier<String> dsInstance2 = Decorators.ofCheckedSupplier(() -> { + Thread.sleep(1000); + return "sleep"; + }); + + GovernanceRequest requestInstance2 = new GovernanceRequest(); + requestInstance2.setInstanceId("instance02"); + requestInstance2.setServiceName("service01"); + requestInstance2.setUri("/test"); + + Bulkhead bulkheadInstance2 = instanceBulkheadHandler.getActuator(requestInstance2); + dsInstance2.withBulkhead(bulkheadInstance2); + + Executor executor = Executors.newFixedThreadPool(4); + AtomicInteger wakeCount = new AtomicInteger(0); + AtomicInteger sleepCount = new AtomicInteger(0); + AtomicInteger errorCount = new AtomicInteger(0); + AtomicInteger rejectCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(100); + for (int i = 0; i < 100; i++) { + final int num = i; + executor.execute(() -> { + // 50% for each server + if (num % 2 == 0) { + runCommand(dsInstance1, wakeCount, sleepCount, errorCount, rejectCount, countDownLatch); + } else { + runCommand(dsInstance2, wakeCount, sleepCount, errorCount, rejectCount, countDownLatch); + } + }); + } + countDownLatch.await(100, TimeUnit.SECONDS); + Assertions.assertEquals(50, wakeCount.get()); + Assertions.assertEquals(2, sleepCount.get()); + Assertions.assertEquals(0, errorCount.get()); + Assertions.assertEquals(48, rejectCount.get()); + } + + private void runCommand(DecorateCheckedSupplier<String> ds, AtomicInteger wakeCount, AtomicInteger sleepCount, + AtomicInteger errorCount, AtomicInteger rejectCount, CountDownLatch countDownLatch) { + try { + String result = ds.get(); + if ("wake".equals(result)) { + wakeCount.incrementAndGet(); + } else if ("sleep".equals(result)) { + sleepCount.incrementAndGet(); + } else { + errorCount.incrementAndGet(); + } + } catch (BulkheadFullException e) { + rejectCount.incrementAndGet(); + } catch (Throwable e) { + errorCount.incrementAndGet(); + } + countDownLatch.countDown(); + } +} diff --git a/governance/src/test/resources/application.yaml b/governance/src/test/resources/application.yaml index e5c0842e6..5a98ad620 100644 --- a/governance/src/test/resources/application.yaml +++ b/governance/src/test/resources/application.yaml @@ -122,6 +122,10 @@ servicecomb: slidingWindowSize: 2 slidingWindowType: COUNT_BASED waitDurationInOpenState: 1000 + instanceBulkhead: + demo-allOperation: | + maxConcurrentCalls: 2 + maxWaitDuration: 10 faultInjection: demo-fallback-ThrowException: | type: abort diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java similarity index 53% copy from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java copy to handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java index 8ae3df80a..65e43465c 100644 --- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java +++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java @@ -25,9 +25,7 @@ import org.apache.servicecomb.core.Handler; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.governance.MatchType; import org.apache.servicecomb.foundation.common.utils.BeanUtils; -import org.apache.servicecomb.governance.handler.BulkheadHandler; -import org.apache.servicecomb.governance.handler.CircuitBreakerHandler; -import org.apache.servicecomb.governance.handler.RateLimitingHandler; +import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler; import org.apache.servicecomb.governance.marker.GovernanceRequest; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.Response; @@ -38,31 +36,26 @@ import org.slf4j.LoggerFactory; import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadFullException; -import io.github.resilience4j.circuitbreaker.CallNotPermittedException; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.decorators.Decorators; import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage; -import io.github.resilience4j.ratelimiter.RateLimiter; -import io.github.resilience4j.ratelimiter.RequestNotPermitted; -public class ProviderGovernanceHandler implements Handler { - private static final Logger LOGGER = LoggerFactory.getLogger(ProviderGovernanceHandler.class); +public class ConsumerInstanceBulkheadHandler implements Handler { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInstanceBulkheadHandler.class); - private final RateLimitingHandler rateLimitingHandler = BeanUtils.getBean(RateLimitingHandler.class); - - private final CircuitBreakerHandler circuitBreakerHandler = BeanUtils.getBean(CircuitBreakerHandler.class); - - private final BulkheadHandler bulkheadHandler = BeanUtils.getBean(BulkheadHandler.class); + private final InstanceBulkheadHandler instanceBulkheadHandler = BeanUtils.getBean(InstanceBulkheadHandler.class); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { - + if (invocation.getEndpoint() == null) { + invocation.next(asyncResp); + return; + } Supplier<CompletionStage<Response>> next = createBusinessCompletionStageSupplier(invocation); DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next); GovernanceRequest request = MatchType.createGovHttpRequest(invocation); + request.setServiceName(invocation.getMicroserviceName()); + request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId()); - addRateLimiting(dcs, request); - addCircuitBreaker(dcs, request); addBulkhead(dcs, request); dcs.get().whenComplete((r, e) -> { @@ -71,18 +64,10 @@ public class ProviderGovernanceHandler implements Handler { return; } - if (e instanceof RequestNotPermitted) { - asyncResp.complete( - Response.failResp(new InvocationException(429, "rate limited.", new CommonExceptionData("rate limited.")))); - LOGGER.warn("the request is rate limit by policy : {}", e.getMessage()); - } else if (e instanceof CallNotPermittedException) { - asyncResp.complete( - Response.failResp(new InvocationException(429, "circuitBreaker is open.", - new CommonExceptionData("circuitBreaker is open.")))); - LOGGER.warn("circuitBreaker is open by policy : {}", e.getMessage()); - } else if (e instanceof BulkheadFullException) { + if (e instanceof BulkheadFullException) { + // return 503 so that consumer can retry asyncResp.complete( - Response.failResp(new InvocationException(429, "bulkhead is full and does not permit further calls.", + Response.failResp(new InvocationException(503, "bulkhead is full and does not permit further calls.", new CommonExceptionData("bulkhead is full and does not permit further calls.")))); LOGGER.warn("bulkhead is full and does not permit further calls by policy : {}", e.getMessage()); } else { @@ -92,40 +77,17 @@ public class ProviderGovernanceHandler implements Handler { } private void addBulkhead(DecorateCompletionStage<Response> dcs, GovernanceRequest request) { - Bulkhead bulkhead = bulkheadHandler.getActuator(request); + Bulkhead bulkhead = instanceBulkheadHandler.getActuator(request); if (bulkhead != null) { dcs.withBulkhead(bulkhead); } } - private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, GovernanceRequest request) { - CircuitBreaker circuitBreaker = circuitBreakerHandler.getActuator(request); - if (circuitBreaker != null) { - dcs.withCircuitBreaker(circuitBreaker); - } - } - - private void addRateLimiting(DecorateCompletionStage<Response> dcs, GovernanceRequest request) { - RateLimiter rateLimiter = rateLimitingHandler.getActuator(request); - if (rateLimiter != null) { - dcs.withRateLimiter(rateLimiter); - } - } - private Supplier<CompletionStage<Response>> createBusinessCompletionStageSupplier(Invocation invocation) { return () -> { CompletableFuture<Response> result = new CompletableFuture<>(); try { - invocation.next(response -> { - if (response.isFailed()) { - // For failed response, create a fail to make circuit breaker work. - // Users application maybe much complicated than this simple logic, - // while they need to customize which error will cause circuit breaker. - result.completeExceptionally(response.getResult()); - } else { - result.complete(response); - } - }); + invocation.next(result::complete); } catch (Exception e) { result.completeExceptionally(e); } diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java similarity index 50% copy from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java copy to handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java index 8ae3df80a..73d3dbb1d 100644 --- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java +++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java @@ -25,9 +25,7 @@ import org.apache.servicecomb.core.Handler; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.governance.MatchType; import org.apache.servicecomb.foundation.common.utils.BeanUtils; -import org.apache.servicecomb.governance.handler.BulkheadHandler; -import org.apache.servicecomb.governance.handler.CircuitBreakerHandler; -import org.apache.servicecomb.governance.handler.RateLimitingHandler; +import org.apache.servicecomb.governance.handler.InstanceIsolationHandler; import org.apache.servicecomb.governance.marker.GovernanceRequest; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.Response; @@ -36,34 +34,29 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.github.resilience4j.bulkhead.Bulkhead; -import io.github.resilience4j.bulkhead.BulkheadFullException; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.decorators.Decorators; import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage; -import io.github.resilience4j.ratelimiter.RateLimiter; -import io.github.resilience4j.ratelimiter.RequestNotPermitted; -public class ProviderGovernanceHandler implements Handler { - private static final Logger LOGGER = LoggerFactory.getLogger(ProviderGovernanceHandler.class); +public class ConsumerInstanceIsolationHandler implements Handler { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInstanceIsolationHandler.class); - private final RateLimitingHandler rateLimitingHandler = BeanUtils.getBean(RateLimitingHandler.class); - - private final CircuitBreakerHandler circuitBreakerHandler = BeanUtils.getBean(CircuitBreakerHandler.class); - - private final BulkheadHandler bulkheadHandler = BeanUtils.getBean(BulkheadHandler.class); + private final InstanceIsolationHandler instanceIsolationHandler = BeanUtils.getBean(InstanceIsolationHandler.class); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { - + if (invocation.getEndpoint() == null) { + invocation.next(asyncResp); + return; + } Supplier<CompletionStage<Response>> next = createBusinessCompletionStageSupplier(invocation); DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next); GovernanceRequest request = MatchType.createGovHttpRequest(invocation); + request.setServiceName(invocation.getMicroserviceName()); + request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId()); - addRateLimiting(dcs, request); addCircuitBreaker(dcs, request); - addBulkhead(dcs, request); dcs.get().whenComplete((r, e) -> { if (e == null) { @@ -71,61 +64,30 @@ public class ProviderGovernanceHandler implements Handler { return; } - if (e instanceof RequestNotPermitted) { - asyncResp.complete( - Response.failResp(new InvocationException(429, "rate limited.", new CommonExceptionData("rate limited.")))); - LOGGER.warn("the request is rate limit by policy : {}", e.getMessage()); - } else if (e instanceof CallNotPermittedException) { - asyncResp.complete( - Response.failResp(new InvocationException(429, "circuitBreaker is open.", - new CommonExceptionData("circuitBreaker is open.")))); - LOGGER.warn("circuitBreaker is open by policy : {}", e.getMessage()); - } else if (e instanceof BulkheadFullException) { + if (e instanceof CallNotPermittedException) { + // return 503 so that consumer can retry asyncResp.complete( - Response.failResp(new InvocationException(429, "bulkhead is full and does not permit further calls.", - new CommonExceptionData("bulkhead is full and does not permit further calls.")))); - LOGGER.warn("bulkhead is full and does not permit further calls by policy : {}", e.getMessage()); + Response.failResp(new InvocationException(503, "instance isolation circuitBreaker is open.", + new CommonExceptionData("instance isolation circuitBreaker is open.")))); + LOGGER.warn("instance isolation circuitBreaker is open by policy : {}", e.getMessage()); } else { asyncResp.complete(Response.createProducerFail(e)); } }); } - private void addBulkhead(DecorateCompletionStage<Response> dcs, GovernanceRequest request) { - Bulkhead bulkhead = bulkheadHandler.getActuator(request); - if (bulkhead != null) { - dcs.withBulkhead(bulkhead); - } - } - private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, GovernanceRequest request) { - CircuitBreaker circuitBreaker = circuitBreakerHandler.getActuator(request); + CircuitBreaker circuitBreaker = instanceIsolationHandler.getActuator(request); if (circuitBreaker != null) { dcs.withCircuitBreaker(circuitBreaker); } } - private void addRateLimiting(DecorateCompletionStage<Response> dcs, GovernanceRequest request) { - RateLimiter rateLimiter = rateLimitingHandler.getActuator(request); - if (rateLimiter != null) { - dcs.withRateLimiter(rateLimiter); - } - } - private Supplier<CompletionStage<Response>> createBusinessCompletionStageSupplier(Invocation invocation) { return () -> { CompletableFuture<Response> result = new CompletableFuture<>(); try { - invocation.next(response -> { - if (response.isFailed()) { - // For failed response, create a fail to make circuit breaker work. - // Users application maybe much complicated than this simple logic, - // while they need to customize which error will cause circuit breaker. - result.completeExceptionally(response.getResult()); - } else { - result.complete(response); - } - }); + invocation.next(result::complete); } catch (Exception e) { result.completeExceptionally(e); } diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java index 8ae3df80a..f3142ab77 100644 --- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java +++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java @@ -116,16 +116,7 @@ public class ProviderGovernanceHandler implements Handler { return () -> { CompletableFuture<Response> result = new CompletableFuture<>(); try { - invocation.next(response -> { - if (response.isFailed()) { - // For failed response, create a fail to make circuit breaker work. - // Users application maybe much complicated than this simple logic, - // while they need to customize which error will cause circuit breaker. - result.completeExceptionally(response.getResult()); - } else { - result.complete(response); - } - }); + invocation.next(result::complete); } catch (Exception e) { result.completeExceptionally(e); } diff --git a/handlers/handler-governance/src/main/resources/config/cse.handler.xml b/handlers/handler-governance/src/main/resources/config/cse.handler.xml index c7383f0cc..ab5a82c6f 100644 --- a/handlers/handler-governance/src/main/resources/config/cse.handler.xml +++ b/handlers/handler-governance/src/main/resources/config/cse.handler.xml @@ -18,6 +18,8 @@ <config> <handler id="governance-provider" class="org.apache.servicecomb.handler.governance.ProviderGovernanceHandler"/> - <handler id="governance-consumer" - class="org.apache.servicecomb.handler.governance.ConsumerGovernanceHandler"/> + <handler id="instance-bulkhead-consumer" + class="org.apache.servicecomb.handler.governance.ConsumerInstanceBulkheadHandler"/> + <handler id="instance-isolation-consumer" + class="org.apache.servicecomb.handler.governance.ConsumerInstanceIsolationHandler"/> </config>