This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new d6fc0a6bf [SCB-2650][SCB-2290]support instance isolation and instance 
bulkhead (#3242)
d6fc0a6bf is described below

commit d6fc0a6bf28a28234fb6338ceafbd816c4c383b6
Author: liubao68 <bi...@qq.com>
AuthorDate: Sun Jul 31 10:43:18 2022 +0800

    [SCB-2650][SCB-2290]support instance isolation and instance bulkhead (#3242)
---
 .../src/main/resources/application.yml             |   2 +-
 .../src/main/resources/application.yml             |   2 +-
 .../main/resources/config/base/log4j.properties    |   2 +-
 .../governance/GovernanceConfiguration.java        |  12 +++
 .../handler/InstanceBulkheadHandler.java           |  88 +++++++++++++++
 .../handler/ext/AbstractFailurePredictor.java      |   7 +-
 .../governance/policy/CircuitBreakerPolicy.java    |  18 +++-
 .../servicecomb/governance/policy/RetryPolicy.java |  21 ++--
 .../properties/InstanceBulkheadProperties.java     |  19 ++--
 .../governance/AbstractFailurePredictorTest.java   |  64 +++++++++++
 .../governance/GovernancePropertiesTest.java       |   2 +-
 .../governance/InstanceBulkheadHandlerTest.java    | 119 +++++++++++++++++++++
 governance/src/test/resources/application.yaml     |   4 +
 ...r.java => ConsumerInstanceBulkheadHandler.java} |  68 +++---------
 ....java => ConsumerInstanceIsolationHandler.java} |  72 +++----------
 .../governance/ProviderGovernanceHandler.java      |  11 +-
 .../src/main/resources/config/cse.handler.xml      |   6 +-
 17 files changed, 371 insertions(+), 146 deletions(-)

diff --git 
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
 
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
index 1eb345f43..c663b708b 100644
--- 
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
+++ 
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
@@ -37,7 +37,7 @@ servicecomb:
   handler:
     chain:
       Consumer:
-        default: governance-consumer,loadbalance
+        default: loadbalance
       Provider:
         default: governance-provider
 
diff --git 
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
 
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
index 7d4118f29..972f3efb2 100644
--- 
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
+++ 
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
@@ -33,6 +33,6 @@ servicecomb:
   handler:
     chain:
       Consumer:
-        default: governance-consumer,loadbalance
+        default: loadbalance
       Provider:
         default: governance-provider
\ No newline at end of file
diff --git 
a/foundations/foundation-common/src/main/resources/config/base/log4j.properties 
b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
index 83de4e475..0cdb3bfbd 100644
--- 
a/foundations/foundation-common/src/main/resources/config/base/log4j.properties
+++ 
b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
@@ -25,7 +25,7 @@ log4j.logger.runLogger=INFO
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%p] %m %l%n
+log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd 
HH:mm:ss,SSS/zzz}][%t][%p]%m %l%n
 
 
log4j.appender.paas=org.apache.servicecomb.foundation.common.utils.RollingFileAppenderExt
 log4j.appender.paas.file=${paas.logs.dir}${paas.logs.file}
diff --git 
a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
 
b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
index 03eb52156..9307da436 100644
--- 
a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
+++ 
b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.servicecomb.governance.handler.BulkheadHandler;
 import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
 import org.apache.servicecomb.governance.handler.FaultInjectionHandler;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
 import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
 import org.apache.servicecomb.governance.handler.RateLimitingHandler;
 import org.apache.servicecomb.governance.handler.RetryHandler;
@@ -38,6 +39,7 @@ import 
org.apache.servicecomb.governance.marker.operator.SuffixOperator;
 import org.apache.servicecomb.governance.properties.BulkheadProperties;
 import org.apache.servicecomb.governance.properties.CircuitBreakerProperties;
 import org.apache.servicecomb.governance.properties.FaultInjectionProperties;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
 import 
org.apache.servicecomb.governance.properties.InstanceIsolationProperties;
 import org.apache.servicecomb.governance.properties.MatchProperties;
 import org.apache.servicecomb.governance.properties.RateLimitProperties;
@@ -58,6 +60,11 @@ public class GovernanceConfiguration {
     return new BulkheadProperties();
   }
 
+  @Bean
+  public InstanceBulkheadProperties instanceBulkheadProperties() {
+    return new InstanceBulkheadProperties();
+  }
+
   @Bean
   public CircuitBreakerProperties circuitBreakerProperties() {
     return new CircuitBreakerProperties();
@@ -94,6 +101,11 @@ public class GovernanceConfiguration {
     return new BulkheadHandler(bulkheadProperties);
   }
 
+  @Bean
+  public InstanceBulkheadHandler 
instanceBulkheadHandler(InstanceBulkheadProperties instanceBulkheadProperties) {
+    return new InstanceBulkheadHandler(instanceBulkheadProperties);
+  }
+
   @Bean
   public CircuitBreakerHandler circuitBreakerHandler(CircuitBreakerProperties 
circuitBreakerProperties,
       AbstractCircuitBreakerExtension circuitBreakerExtension,
diff --git 
a/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
 
b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
new file mode 100644
index 000000000..bdfe7d0b6
--- /dev/null
+++ 
b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance.handler;
+
+import java.time.Duration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadRegistry;
+
+public class InstanceBulkheadHandler extends 
AbstractGovernanceHandler<Bulkhead, BulkheadPolicy> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceBulkheadHandler.class);
+
+  private final InstanceBulkheadProperties bulkheadProperties;
+
+  public InstanceBulkheadHandler(InstanceBulkheadProperties 
bulkheadProperties) {
+    this.bulkheadProperties = bulkheadProperties;
+  }
+
+  @Override
+  protected String createKey(GovernanceRequest governanceRequest, 
BulkheadPolicy policy) {
+    return InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY
+        + "." + governanceRequest.getServiceName()
+        + "." + policy.getName()
+        + "." + governanceRequest.getInstanceId();
+  }
+
+  @Override
+  protected void onConfigurationChanged(String key) {
+    if 
(key.startsWith(InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY)) {
+      for (String processorKey : processors.keySet()) {
+        if (processorKey.startsWith(key)) {
+          processors.remove(processorKey);
+        }
+      }
+    }
+  }
+
+  @Override
+  public BulkheadPolicy matchPolicy(GovernanceRequest governanceRequest) {
+    if (StringUtils.isEmpty(governanceRequest.getServiceName()) || 
StringUtils.isEmpty(
+        governanceRequest.getInstanceId())) {
+      LOGGER.info("Instance bulkhead is not properly configured, service id or 
instance id is empty.");
+      return null;
+    }
+    return matchersManager.match(governanceRequest, 
bulkheadProperties.getParsedEntity());
+  }
+
+  @Override
+  public Bulkhead createProcessor(String key, GovernanceRequest 
governanceRequest, BulkheadPolicy policy) {
+    return getBulkhead(key, policy);
+  }
+
+  private Bulkhead getBulkhead(String key, BulkheadPolicy policy) {
+    LOGGER.info("applying new policy {} for {}", key, policy.toString());
+
+    BulkheadConfig config = BulkheadConfig.custom()
+        .maxConcurrentCalls(policy.getMaxConcurrentCalls())
+        .maxWaitDuration(Duration.parse(policy.getMaxWaitDuration()))
+        .build();
+
+    BulkheadRegistry registry = BulkheadRegistry.of(config);
+
+    return registry.bulkhead(key, config);
+  }
+}
diff --git 
a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
 
b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
index 8af8ae68f..e7fea24f9 100644
--- 
a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
+++ 
b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
@@ -38,12 +38,15 @@ public abstract class AbstractFailurePredictor implements 
FailurePredictor {
   }
 
   private static boolean statusCodeMatch(String status, String responseStatus) 
{
-    if (3 != status.length()) {
+    if (status == null) {
+      return false;
+    }
+    if (responseStatus.length() != status.length()) {
       return false;
     }
     char[] statusChar = status.toCharArray();
     char[] responseChar = responseStatus.toCharArray();
-    return IntStream.range(0, 3).noneMatch(i ->
+    return IntStream.range(0, statusChar.length).noneMatch(i ->
         statusChar[i] != responseChar[i] && statusChar[i] != 'x');
   }
 }
diff --git 
a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
 
b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
index 9996d1b72..1d1bdfeec 100644
--- 
a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
+++ 
b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
@@ -17,8 +17,9 @@
 package org.apache.servicecomb.governance.policy;
 
 import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.servicecomb.governance.utils.GovernanceUtils;
@@ -48,6 +49,9 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
 
   public static final String DEFAULT_FAILURE_RESPONSE_STATUS_503 = "503";
 
+  public static final List<String> DEFAULT_STATUS_LIST = 
Arrays.asList(DEFAULT_FAILURE_RESPONSE_STATUS_502,
+      DEFAULT_FAILURE_RESPONSE_STATUS_503);
+
   private float failureRateThreshold = DEFAULT_FAILURE_RATE_THRESHOLD;
 
   private float slowCallRateThreshold = DEFAULT_SLOW_CALL_RATE_THRESHOLD;
@@ -65,7 +69,7 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
   private String slidingWindowSize = DEFAULT_SLIDING_WINDOW_SIZE;
 
   //status code that need record as a failure
-  private List<String> recordFailureStatus = new ArrayList<>();
+  private List<String> recordFailureStatus = DEFAULT_STATUS_LIST;
 
   //force close this circuit breaker. This parameter is not used by circuit 
breaker directly
   private boolean forceClosed = false;
@@ -193,14 +197,17 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
 
   public List<String> getRecordFailureStatus() {
     if (CollectionUtils.isEmpty(this.recordFailureStatus)) {
-      this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_502);
-      this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_503);
+      return DEFAULT_STATUS_LIST;
     }
     return this.recordFailureStatus;
   }
 
   public void setRecordFailureStatus(List<String> recordFailureStatus) {
-    this.recordFailureStatus = recordFailureStatus;
+    if (recordFailureStatus == null) {
+      return;
+    }
+    this.recordFailureStatus = recordFailureStatus.stream().filter(e -> 
!StringUtils.isEmpty(e))
+        .collect(Collectors.toList());
   }
 
   public boolean isForceClosed() {
@@ -230,6 +237,7 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
         ", minimumNumberOfCalls=" + minimumNumberOfCalls +
         ", slidingWindowType='" + slidingWindowType + '\'' +
         ", slidingWindowSize=" + slidingWindowSize +
+        ", recordFailureStatus=" + recordFailureStatus +
         '}';
   }
 }
diff --git 
a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
 
b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
index 95be9194f..726121a2c 100644
--- 
a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
+++ 
b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
@@ -17,8 +17,9 @@
 package org.apache.servicecomb.governance.policy;
 
 import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.util.CollectionUtils;
@@ -27,12 +28,15 @@ public class RetryPolicy extends AbstractPolicy {
 
   public static final int DEFAULT_MAX_ATTEMPTS = 3;
 
-  public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(10);
+  public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(1);
 
   public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_502 = "502";
 
   public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_503 = "503";
 
+  public static final List<String> DEFAULT_STATUS_LIST = 
Arrays.asList(DEFAULT_RETRY_ON_RESPONSE_STATUS_502,
+      DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+
   private static final Duration INITIAL_INTERVAL = Duration.ofMillis(1000);
 
   private static final float MULTIPLIER = 2;
@@ -48,7 +52,7 @@ public class RetryPolicy extends AbstractPolicy {
   private String waitDuration = DEFAULT_WAIT_DURATION.toString();
 
   //status code that need retry
-  private List<String> retryOnResponseStatus = new ArrayList<>();
+  private List<String> retryOnResponseStatus = DEFAULT_STATUS_LIST;
 
   //retry strategy
   private String retryStrategy = DEFAULT_RETRY_STRATEGY;
@@ -71,14 +75,17 @@ public class RetryPolicy extends AbstractPolicy {
 
   public List<String> getRetryOnResponseStatus() {
     if (CollectionUtils.isEmpty(retryOnResponseStatus)) {
-      this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_502);
-      this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+      return DEFAULT_STATUS_LIST;
     }
     return retryOnResponseStatus;
   }
 
   public void setRetryOnResponseStatus(List<String> retryOnResponseStatus) {
-    this.retryOnResponseStatus = retryOnResponseStatus;
+    if (retryOnResponseStatus == null) {
+      return;
+    }
+    this.retryOnResponseStatus = retryOnResponseStatus.stream().filter(e -> 
!StringUtils.isEmpty(e))
+        .collect(Collectors.toList());
   }
 
   public int getMaxAttempts() {
@@ -90,7 +97,7 @@ public class RetryPolicy extends AbstractPolicy {
   }
 
   public String getWaitDuration() {
-    return Duration.parse(waitDuration).toMillis() < 10 ? 
DEFAULT_WAIT_DURATION.toString() : waitDuration;
+    return Duration.parse(waitDuration).toMillis() < 1 ? 
DEFAULT_WAIT_DURATION.toString() : waitDuration;
   }
 
   public void setWaitDuration(String waitDuration) {
diff --git 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
 
b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
similarity index 64%
rename from 
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
rename to 
governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
index f5cfbf72a..edd4b6045 100644
--- 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
+++ 
b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.governance.properties;
 
-import org.apache.servicecomb.core.Handler;
-import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
+
+public class InstanceBulkheadProperties extends 
PolicyProperties<BulkheadPolicy> {
+  public static final String MATCH_INSTANCE_BULKHEAD_KEY = 
"servicecomb.instanceBulkhead";
+
+  public InstanceBulkheadProperties() {
+    super(MATCH_INSTANCE_BULKHEAD_KEY);
+  }
 
-public class ConsumerGovernanceHandler implements Handler {
-  // an empty implementation, will add possible features in future.
   @Override
-  public void handle(Invocation invocation, AsyncResponse asyncResp) throws 
Exception {
-    invocation.next(asyncResp);
+  public Class<BulkheadPolicy> getEntityClass() {
+    return BulkheadPolicy.class;
   }
 }
diff --git 
a/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
 
b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
new file mode 100644
index 000000000..9c84287ac
--- /dev/null
+++ 
b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.servicecomb.governance.handler.ext.AbstractFailurePredictor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AbstractFailurePredictorTest {
+  class MyAbstractFailurePredictor extends AbstractFailurePredictor {
+    MyAbstractFailurePredictor() {
+    }
+
+    @Override
+    protected String extractStatusCode(Object result) {
+      return (String) result;
+    }
+
+    @Override
+    public boolean isFailedResult(Throwable e) {
+      return super.isFailedResult(e);
+    }
+  }
+
+  @Test
+  public void testCodeMatch() {
+    AbstractFailurePredictor predictor = new MyAbstractFailurePredictor();
+    List<String> statusList = Arrays.asList("500");
+    Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+    statusList = Arrays.asList("5x0");
+    Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+    statusList = Arrays.asList(null, "xx", "5x0");
+    Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+  }
+}
diff --git 
a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
 
b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
index bc943a070..e0c55964c 100644
--- 
a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
+++ 
b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
@@ -164,7 +164,7 @@ public class GovernancePropertiesTest {
 
   @Test
   public void test_all_bean_is_loaded() {
-    Assertions.assertEquals(6, propertiesList.size());
+    Assertions.assertEquals(7, propertiesList.size());
   }
 
   @Test
diff --git 
a/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
 
b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
new file mode 100644
index 000000000..2377aabc3
--- /dev/null
+++ 
b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ContextConfiguration;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCheckedSupplier;
+
+@SpringBootTest
+@ContextConfiguration(classes = {GovernanceConfiguration.class, 
MockConfiguration.class})
+public class InstanceBulkheadHandlerTest {
+  private InstanceBulkheadHandler instanceBulkheadHandler;
+
+  @Autowired
+  public void setInstanceBulkheadHandler(InstanceBulkheadHandler 
instanceBulkheadHandler) {
+    this.instanceBulkheadHandler = instanceBulkheadHandler;
+  }
+
+  @Test
+  public void test_instance_bulkhead_work() throws Throwable {
+
+    // instance1
+    DecorateCheckedSupplier<String> dsInstance1 = 
Decorators.ofCheckedSupplier(() -> "wake");
+
+    GovernanceRequest requestInstance1 = new GovernanceRequest();
+    requestInstance1.setInstanceId("instance01");
+    requestInstance1.setServiceName("service01");
+    requestInstance1.setUri("/test");
+
+    Bulkhead bulkheadInstance1 = 
instanceBulkheadHandler.getActuator(requestInstance1);
+    dsInstance1.withBulkhead(bulkheadInstance1);
+
+    // instance2
+    DecorateCheckedSupplier<String> dsInstance2 = 
Decorators.ofCheckedSupplier(() -> {
+      Thread.sleep(1000);
+      return "sleep";
+    });
+
+    GovernanceRequest requestInstance2 = new GovernanceRequest();
+    requestInstance2.setInstanceId("instance02");
+    requestInstance2.setServiceName("service01");
+    requestInstance2.setUri("/test");
+
+    Bulkhead bulkheadInstance2 = 
instanceBulkheadHandler.getActuator(requestInstance2);
+    dsInstance2.withBulkhead(bulkheadInstance2);
+
+    Executor executor = Executors.newFixedThreadPool(4);
+    AtomicInteger wakeCount = new AtomicInteger(0);
+    AtomicInteger sleepCount = new AtomicInteger(0);
+    AtomicInteger errorCount = new AtomicInteger(0);
+    AtomicInteger rejectCount = new AtomicInteger(0);
+    CountDownLatch countDownLatch = new CountDownLatch(100);
+    for (int i = 0; i < 100; i++) {
+      final int num = i;
+      executor.execute(() -> {
+        // 50% for each server
+        if (num % 2 == 0) {
+          runCommand(dsInstance1, wakeCount, sleepCount, errorCount, 
rejectCount, countDownLatch);
+        } else {
+          runCommand(dsInstance2, wakeCount, sleepCount, errorCount, 
rejectCount, countDownLatch);
+        }
+      });
+    }
+    countDownLatch.await(100, TimeUnit.SECONDS);
+    Assertions.assertEquals(50, wakeCount.get());
+    Assertions.assertEquals(2, sleepCount.get());
+    Assertions.assertEquals(0, errorCount.get());
+    Assertions.assertEquals(48, rejectCount.get());
+  }
+
+  private void runCommand(DecorateCheckedSupplier<String> ds, AtomicInteger 
wakeCount, AtomicInteger sleepCount,
+      AtomicInteger errorCount, AtomicInteger rejectCount, CountDownLatch 
countDownLatch) {
+    try {
+      String result = ds.get();
+      if ("wake".equals(result)) {
+        wakeCount.incrementAndGet();
+      } else if ("sleep".equals(result)) {
+        sleepCount.incrementAndGet();
+      } else {
+        errorCount.incrementAndGet();
+      }
+    } catch (BulkheadFullException e) {
+      rejectCount.incrementAndGet();
+    } catch (Throwable e) {
+      errorCount.incrementAndGet();
+    }
+    countDownLatch.countDown();
+  }
+}
diff --git a/governance/src/test/resources/application.yaml 
b/governance/src/test/resources/application.yaml
index e5c0842e6..5a98ad620 100644
--- a/governance/src/test/resources/application.yaml
+++ b/governance/src/test/resources/application.yaml
@@ -122,6 +122,10 @@ servicecomb:
       slidingWindowSize: 2
       slidingWindowType: COUNT_BASED
       waitDurationInOpenState: 1000
+  instanceBulkhead:
+    demo-allOperation: |
+      maxConcurrentCalls: 2
+      maxWaitDuration: 10
   faultInjection:
     demo-fallback-ThrowException: |
       type: abort
diff --git 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
 
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
similarity index 53%
copy from 
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
copy to 
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
index 8ae3df80a..65e43465c 100644
--- 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++ 
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
@@ -25,9 +25,7 @@ import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.governance.MatchType;
 import org.apache.servicecomb.foundation.common.utils.BeanUtils;
-import org.apache.servicecomb.governance.handler.BulkheadHandler;
-import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
-import org.apache.servicecomb.governance.handler.RateLimitingHandler;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
 import org.apache.servicecomb.governance.marker.GovernanceRequest;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.apache.servicecomb.swagger.invocation.Response;
@@ -38,31 +36,26 @@ import org.slf4j.LoggerFactory;
 
 import io.github.resilience4j.bulkhead.Bulkhead;
 import io.github.resilience4j.bulkhead.BulkheadFullException;
-import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
-import io.github.resilience4j.circuitbreaker.CircuitBreaker;
 import io.github.resilience4j.decorators.Decorators;
 import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
-import io.github.resilience4j.ratelimiter.RateLimiter;
-import io.github.resilience4j.ratelimiter.RequestNotPermitted;
 
-public class ProviderGovernanceHandler implements Handler {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProviderGovernanceHandler.class);
+public class ConsumerInstanceBulkheadHandler implements Handler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerInstanceBulkheadHandler.class);
 
-  private final RateLimitingHandler rateLimitingHandler = 
BeanUtils.getBean(RateLimitingHandler.class);
-
-  private final CircuitBreakerHandler circuitBreakerHandler = 
BeanUtils.getBean(CircuitBreakerHandler.class);
-
-  private final BulkheadHandler bulkheadHandler = 
BeanUtils.getBean(BulkheadHandler.class);
+  private final InstanceBulkheadHandler instanceBulkheadHandler = 
BeanUtils.getBean(InstanceBulkheadHandler.class);
 
   @Override
   public void handle(Invocation invocation, AsyncResponse asyncResp) throws 
Exception {
-
+    if (invocation.getEndpoint() == null) {
+      invocation.next(asyncResp);
+      return;
+    }
     Supplier<CompletionStage<Response>> next = 
createBusinessCompletionStageSupplier(invocation);
     DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
     GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+    request.setServiceName(invocation.getMicroserviceName());
+    
request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
 
-    addRateLimiting(dcs, request);
-    addCircuitBreaker(dcs, request);
     addBulkhead(dcs, request);
 
     dcs.get().whenComplete((r, e) -> {
@@ -71,18 +64,10 @@ public class ProviderGovernanceHandler implements Handler {
         return;
       }
 
-      if (e instanceof RequestNotPermitted) {
-        asyncResp.complete(
-            Response.failResp(new InvocationException(429, "rate limited.", 
new CommonExceptionData("rate limited."))));
-        LOGGER.warn("the request is rate limit by policy : {}", 
e.getMessage());
-      } else if (e instanceof CallNotPermittedException) {
-        asyncResp.complete(
-            Response.failResp(new InvocationException(429, "circuitBreaker is 
open.",
-                new CommonExceptionData("circuitBreaker is open."))));
-        LOGGER.warn("circuitBreaker is open by policy : {}", e.getMessage());
-      } else if (e instanceof BulkheadFullException) {
+      if (e instanceof BulkheadFullException) {
+        // return 503 so that consumer can retry
         asyncResp.complete(
-            Response.failResp(new InvocationException(429, "bulkhead is full 
and does not permit further calls.",
+            Response.failResp(new InvocationException(503, "bulkhead is full 
and does not permit further calls.",
                 new CommonExceptionData("bulkhead is full and does not permit 
further calls."))));
         LOGGER.warn("bulkhead is full and does not permit further calls by 
policy : {}", e.getMessage());
       } else {
@@ -92,40 +77,17 @@ public class ProviderGovernanceHandler implements Handler {
   }
 
   private void addBulkhead(DecorateCompletionStage<Response> dcs, 
GovernanceRequest request) {
-    Bulkhead bulkhead = bulkheadHandler.getActuator(request);
+    Bulkhead bulkhead = instanceBulkheadHandler.getActuator(request);
     if (bulkhead != null) {
       dcs.withBulkhead(bulkhead);
     }
   }
 
-  private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, 
GovernanceRequest request) {
-    CircuitBreaker circuitBreaker = circuitBreakerHandler.getActuator(request);
-    if (circuitBreaker != null) {
-      dcs.withCircuitBreaker(circuitBreaker);
-    }
-  }
-
-  private void addRateLimiting(DecorateCompletionStage<Response> dcs, 
GovernanceRequest request) {
-    RateLimiter rateLimiter = rateLimitingHandler.getActuator(request);
-    if (rateLimiter != null) {
-      dcs.withRateLimiter(rateLimiter);
-    }
-  }
-
   private Supplier<CompletionStage<Response>> 
createBusinessCompletionStageSupplier(Invocation invocation) {
     return () -> {
       CompletableFuture<Response> result = new CompletableFuture<>();
       try {
-        invocation.next(response -> {
-          if (response.isFailed()) {
-            // For failed response, create a fail to make circuit breaker work.
-            // Users application maybe much complicated than this simple logic,
-            // while they need to customize which error will cause circuit 
breaker.
-            result.completeExceptionally(response.getResult());
-          } else {
-            result.complete(response);
-          }
-        });
+        invocation.next(result::complete);
       } catch (Exception e) {
         result.completeExceptionally(e);
       }
diff --git 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
 
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
similarity index 50%
copy from 
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
copy to 
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
index 8ae3df80a..73d3dbb1d 100644
--- 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++ 
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
@@ -25,9 +25,7 @@ import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.governance.MatchType;
 import org.apache.servicecomb.foundation.common.utils.BeanUtils;
-import org.apache.servicecomb.governance.handler.BulkheadHandler;
-import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
-import org.apache.servicecomb.governance.handler.RateLimitingHandler;
+import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
 import org.apache.servicecomb.governance.marker.GovernanceRequest;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.apache.servicecomb.swagger.invocation.Response;
@@ -36,34 +34,29 @@ import 
org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.github.resilience4j.bulkhead.Bulkhead;
-import io.github.resilience4j.bulkhead.BulkheadFullException;
 import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
 import io.github.resilience4j.circuitbreaker.CircuitBreaker;
 import io.github.resilience4j.decorators.Decorators;
 import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
-import io.github.resilience4j.ratelimiter.RateLimiter;
-import io.github.resilience4j.ratelimiter.RequestNotPermitted;
 
-public class ProviderGovernanceHandler implements Handler {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProviderGovernanceHandler.class);
+public class ConsumerInstanceIsolationHandler implements Handler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerInstanceIsolationHandler.class);
 
-  private final RateLimitingHandler rateLimitingHandler = 
BeanUtils.getBean(RateLimitingHandler.class);
-
-  private final CircuitBreakerHandler circuitBreakerHandler = 
BeanUtils.getBean(CircuitBreakerHandler.class);
-
-  private final BulkheadHandler bulkheadHandler = 
BeanUtils.getBean(BulkheadHandler.class);
+  private final InstanceIsolationHandler instanceIsolationHandler = 
BeanUtils.getBean(InstanceIsolationHandler.class);
 
   @Override
   public void handle(Invocation invocation, AsyncResponse asyncResp) throws 
Exception {
-
+    if (invocation.getEndpoint() == null) {
+      invocation.next(asyncResp);
+      return;
+    }
     Supplier<CompletionStage<Response>> next = 
createBusinessCompletionStageSupplier(invocation);
     DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
     GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+    request.setServiceName(invocation.getMicroserviceName());
+    
request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
 
-    addRateLimiting(dcs, request);
     addCircuitBreaker(dcs, request);
-    addBulkhead(dcs, request);
 
     dcs.get().whenComplete((r, e) -> {
       if (e == null) {
@@ -71,61 +64,30 @@ public class ProviderGovernanceHandler implements Handler {
         return;
       }
 
-      if (e instanceof RequestNotPermitted) {
-        asyncResp.complete(
-            Response.failResp(new InvocationException(429, "rate limited.", 
new CommonExceptionData("rate limited."))));
-        LOGGER.warn("the request is rate limit by policy : {}", 
e.getMessage());
-      } else if (e instanceof CallNotPermittedException) {
-        asyncResp.complete(
-            Response.failResp(new InvocationException(429, "circuitBreaker is 
open.",
-                new CommonExceptionData("circuitBreaker is open."))));
-        LOGGER.warn("circuitBreaker is open by policy : {}", e.getMessage());
-      } else if (e instanceof BulkheadFullException) {
+      if (e instanceof CallNotPermittedException) {
+        // return 503 so that consumer can retry
         asyncResp.complete(
-            Response.failResp(new InvocationException(429, "bulkhead is full 
and does not permit further calls.",
-                new CommonExceptionData("bulkhead is full and does not permit 
further calls."))));
-        LOGGER.warn("bulkhead is full and does not permit further calls by 
policy : {}", e.getMessage());
+            Response.failResp(new InvocationException(503, "instance isolation 
circuitBreaker is open.",
+                new CommonExceptionData("instance isolation circuitBreaker is 
open."))));
+        LOGGER.warn("instance isolation circuitBreaker is open by policy : 
{}", e.getMessage());
       } else {
         asyncResp.complete(Response.createProducerFail(e));
       }
     });
   }
 
-  private void addBulkhead(DecorateCompletionStage<Response> dcs, 
GovernanceRequest request) {
-    Bulkhead bulkhead = bulkheadHandler.getActuator(request);
-    if (bulkhead != null) {
-      dcs.withBulkhead(bulkhead);
-    }
-  }
-
   private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, 
GovernanceRequest request) {
-    CircuitBreaker circuitBreaker = circuitBreakerHandler.getActuator(request);
+    CircuitBreaker circuitBreaker = 
instanceIsolationHandler.getActuator(request);
     if (circuitBreaker != null) {
       dcs.withCircuitBreaker(circuitBreaker);
     }
   }
 
-  private void addRateLimiting(DecorateCompletionStage<Response> dcs, 
GovernanceRequest request) {
-    RateLimiter rateLimiter = rateLimitingHandler.getActuator(request);
-    if (rateLimiter != null) {
-      dcs.withRateLimiter(rateLimiter);
-    }
-  }
-
   private Supplier<CompletionStage<Response>> 
createBusinessCompletionStageSupplier(Invocation invocation) {
     return () -> {
       CompletableFuture<Response> result = new CompletableFuture<>();
       try {
-        invocation.next(response -> {
-          if (response.isFailed()) {
-            // For failed response, create a fail to make circuit breaker work.
-            // Users application maybe much complicated than this simple logic,
-            // while they need to customize which error will cause circuit 
breaker.
-            result.completeExceptionally(response.getResult());
-          } else {
-            result.complete(response);
-          }
-        });
+        invocation.next(result::complete);
       } catch (Exception e) {
         result.completeExceptionally(e);
       }
diff --git 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
 
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
index 8ae3df80a..f3142ab77 100644
--- 
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++ 
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
@@ -116,16 +116,7 @@ public class ProviderGovernanceHandler implements Handler {
     return () -> {
       CompletableFuture<Response> result = new CompletableFuture<>();
       try {
-        invocation.next(response -> {
-          if (response.isFailed()) {
-            // For failed response, create a fail to make circuit breaker work.
-            // Users application maybe much complicated than this simple logic,
-            // while they need to customize which error will cause circuit 
breaker.
-            result.completeExceptionally(response.getResult());
-          } else {
-            result.complete(response);
-          }
-        });
+        invocation.next(result::complete);
       } catch (Exception e) {
         result.completeExceptionally(e);
       }
diff --git 
a/handlers/handler-governance/src/main/resources/config/cse.handler.xml 
b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
index c7383f0cc..ab5a82c6f 100644
--- a/handlers/handler-governance/src/main/resources/config/cse.handler.xml
+++ b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
@@ -18,6 +18,8 @@
 <config>
   <handler id="governance-provider"
     
class="org.apache.servicecomb.handler.governance.ProviderGovernanceHandler"/>
-  <handler id="governance-consumer"
-    
class="org.apache.servicecomb.handler.governance.ConsumerGovernanceHandler"/>
+  <handler id="instance-bulkhead-consumer"
+    
class="org.apache.servicecomb.handler.governance.ConsumerInstanceBulkheadHandler"/>
+  <handler id="instance-isolation-consumer"
+    
class="org.apache.servicecomb.handler.governance.ConsumerInstanceIsolationHandler"/>
 </config>

Reply via email to