Copilot commented on code in PR #3193:
URL: https://github.com/apache/dubbo-go/pull/3193#discussion_r2766668591
##########
filter/hystrix/filter.go:
##########
@@ -87,111 +86,31 @@ func NewHystrixFilterError(err error, failByHystrix bool)
error {
}
// Filter for Hystrix
-/**
- * You should add hystrix related configuration in provider or consumer config
or both, according to which side you are to apply Filter.
- * For example:
- * filter_conf:
- * hystrix:
- * configs:
- * # =========== Define config here ============
- * "Default":
- * timeout : 1000
- * max_concurrent_requests : 25
- * sleep_window : 5000
- * error_percent_threshold : 50
- * request_volume_threshold: 20
- * "userp":
- * timeout: 2000
- * max_concurrent_requests: 512
- * sleep_window: 4000
- * error_percent_threshold: 35
- * request_volume_threshold: 6
- * "userp_m":
- * timeout : 1200
- * max_concurrent_requests : 512
- * sleep_window : 6000
- * error_percent_threshold : 60
- * request_volume_threshold: 16
- * # =========== Define error whitelist which will be ignored by Hystrix
counter ============
- * error_whitelist: [".*exception.*"]
- *
- * # =========== Apply default config here ===========
- * default: "Default"
- *
- * services:
- * "com.ikurento.user.UserProvider":
- * # =========== Apply service level config ===========
- * service_config: "userp"
- * # =========== Apply method level config ===========
- * methods:
- * "GetUser": "userp_m"
- * "GetUser1": "userp_m"
- */
type Filter struct {
- COrP bool // true for consumer
- res map[string][]*regexp.Regexp
- ifNewMap sync.Map
+ COrP bool // true for consumer, false for provider
}
// Invoke is an implementation of filter, provides Hystrix pattern latency and
fault tolerance
func (f *Filter) Invoke(ctx context.Context, invoker base.Invoker, invocation
base.Invocation) result.Result {
- cmdName := fmt.Sprintf("%s&method=%s", invoker.GetURL().Key(),
invocation.MethodName())
+ cmdName := getResourceName(invoker, invocation, f.COrP)
Review Comment:
The resource name format has changed from the old implementation.
Previously, it used URL.Key() which had a different format. The new format
"dubbo:consumer:InterfaceName:group:version:Method(paramTypes)" is consistent
with the Sentinel filter but represents a breaking change. Users who configured
hystrix commands based on the old format will need to update their
configuration. Consider documenting this breaking change in the PR description
or migration guide.
##########
filter/hystrix/filter.go:
##########
@@ -87,111 +86,31 @@ func NewHystrixFilterError(err error, failByHystrix bool)
error {
}
// Filter for Hystrix
-/**
- * You should add hystrix related configuration in provider or consumer config
or both, according to which side you are to apply Filter.
- * For example:
- * filter_conf:
- * hystrix:
- * configs:
- * # =========== Define config here ============
- * "Default":
- * timeout : 1000
- * max_concurrent_requests : 25
- * sleep_window : 5000
- * error_percent_threshold : 50
- * request_volume_threshold: 20
- * "userp":
- * timeout: 2000
- * max_concurrent_requests: 512
- * sleep_window: 4000
- * error_percent_threshold: 35
- * request_volume_threshold: 6
- * "userp_m":
- * timeout : 1200
- * max_concurrent_requests : 512
- * sleep_window : 6000
- * error_percent_threshold : 60
- * request_volume_threshold: 16
- * # =========== Define error whitelist which will be ignored by Hystrix
counter ============
- * error_whitelist: [".*exception.*"]
- *
- * # =========== Apply default config here ===========
- * default: "Default"
- *
- * services:
- * "com.ikurento.user.UserProvider":
- * # =========== Apply service level config ===========
- * service_config: "userp"
- * # =========== Apply method level config ===========
- * methods:
- * "GetUser": "userp_m"
- * "GetUser1": "userp_m"
- */
type Filter struct {
- COrP bool // true for consumer
- res map[string][]*regexp.Regexp
- ifNewMap sync.Map
+ COrP bool // true for consumer, false for provider
}
// Invoke is an implementation of filter, provides Hystrix pattern latency and
fault tolerance
func (f *Filter) Invoke(ctx context.Context, invoker base.Invoker, invocation
base.Invocation) result.Result {
- cmdName := fmt.Sprintf("%s&method=%s", invoker.GetURL().Key(),
invocation.MethodName())
+ cmdName := getResourceName(invoker, invocation, f.COrP)
- // Do the configuration if the circuit breaker is created for the first
time
- if _, load := f.ifNewMap.LoadOrStore(cmdName, true); !load {
- configLoadMutex.Lock()
- filterConf := getConfig(invoker.GetURL().Service(),
invocation.MethodName(), f.COrP)
- for _, ptn := range filterConf.Error {
- reg, err := regexp.Compile(ptn)
- if err != nil {
- logger.Warnf("[Hystrix Filter]Errors occurred
parsing error omit regexp: %s, %v", ptn, err)
- } else {
- if f.res == nil {
- f.res =
make(map[string][]*regexp.Regexp)
- }
- f.res[invocation.MethodName()] =
append(f.res[invocation.MethodName()], reg)
- }
- }
- hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{
- Timeout: filterConf.Timeout,
- MaxConcurrentRequests:
filterConf.MaxConcurrentRequests,
- SleepWindow: filterConf.SleepWindow,
- ErrorPercentThreshold:
filterConf.ErrorPercentThreshold,
- RequestVolumeThreshold:
filterConf.RequestVolumeThreshold,
- })
- configLoadMutex.Unlock()
- }
- configLoadMutex.RLock()
- _, _, err := hystrix.GetCircuit(cmdName)
- configLoadMutex.RUnlock()
- if err != nil {
- logger.Errorf("[Hystrix Filter]Errors occurred getting circuit
for %s , will invoke without hystrix, error is: %+v", cmdName, err)
- return invoker.Invoke(ctx, invocation)
- }
- logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName)
var res result.Result
- _ = hystrix.Do(cmdName, func() error {
+ err := hystrix.Do(cmdName, func() error {
res = invoker.Invoke(ctx, invocation)
- err := res.Error()
- if err != nil {
- res.SetError(NewHystrixFilterError(err, false))
- for _, reg := range f.res[invocation.MethodName()] {
- if reg.MatchString(err.Error()) {
- logger.Debugf("[Hystrix Filter]Error in
invocation but omitted in circuit breaker: %v; %s", err, cmdName)
- return nil
- }
- }
- }
- return err
+ return res.Error()
Review Comment:
The error whitelist feature has been removed. Previously, users could
configure error patterns to exclude from circuit breaker metrics. While this
simplifies the implementation and aligns with the Sentinel filter approach,
it's a breaking change for users who relied on this functionality. Consider
documenting this removal and providing guidance on alternative approaches, such
as implementing custom error filtering logic before the hystrix filter.
##########
filter/hystrix/filter_test.go:
##########
@@ -19,228 +19,70 @@ package hystrix
import (
"context"
- "fmt"
- "regexp"
+ "errors"
"testing"
)
import (
"github.com/afex/hystrix-go/hystrix"
- "github.com/pkg/errors"
-
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
- "dubbo.apache.org/dubbo-go/v3/protocol/result"
)
-func init() {
- mockInitHystrixConfig()
-}
-
func TestNewHystrixFilterError(t *testing.T) {
get := NewHystrixFilterError(errors.New("test"), true)
assert.True(t, get.(*FilterError).FailByHystrix())
assert.Equal(t, "test", get.Error())
}
-func mockInitHystrixConfig() {
- // Mock config
- confConsumer = &FilterConfig{
- make(map[string]*CommandConfigWithError),
- "Default",
- make(map[string]ServiceHystrixConfig),
- }
- confConsumer.Configs["Default"] = &CommandConfigWithError{
- Timeout: 1000,
- MaxConcurrentRequests: 600,
- RequestVolumeThreshold: 5,
- SleepWindow: 5000,
- ErrorPercentThreshold: 5,
- Error: nil,
- }
- confConsumer.Configs["userp"] = &CommandConfigWithError{
- Timeout: 2000,
- MaxConcurrentRequests: 64,
- RequestVolumeThreshold: 15,
- SleepWindow: 4000,
- ErrorPercentThreshold: 45,
- Error: nil,
- }
- confConsumer.Configs["userp_m"] = &CommandConfigWithError{
- Timeout: 1200,
- MaxConcurrentRequests: 64,
- RequestVolumeThreshold: 5,
- SleepWindow: 6000,
- ErrorPercentThreshold: 60,
- Error: []string{
- "exception",
- },
- }
- confConsumer.Services["com.ikurento.user.UserProvider"] =
ServiceHystrixConfig{
- "userp",
- map[string]string{
- "GetUser": "userp_m",
- },
- }
-}
-
func TestGetHystrixFilter(t *testing.T) {
filterGot := newFilterConsumer()
assert.NotNil(t, filterGot)
-}
-
-func TestGetConfig1(t *testing.T) {
- mockInitHystrixConfig()
- configGot := getConfig("com.ikurento.user.UserProvider", "GetUser",
true)
- assert.NotNil(t, configGot)
- assert.Equal(t, 1200, configGot.Timeout)
- assert.Equal(t, 64, configGot.MaxConcurrentRequests)
- assert.Equal(t, 6000, configGot.SleepWindow)
- assert.Equal(t, 60, configGot.ErrorPercentThreshold)
- assert.Equal(t, 5, configGot.RequestVolumeThreshold)
-}
-
-func TestGetConfig2(t *testing.T) {
- mockInitHystrixConfig()
- configGot := getConfig("com.ikurento.user.UserProvider", "GetUser0",
true)
- assert.NotNil(t, configGot)
- assert.Equal(t, 2000, configGot.Timeout)
- assert.Equal(t, 64, configGot.MaxConcurrentRequests)
- assert.Equal(t, 4000, configGot.SleepWindow)
- assert.Equal(t, 45, configGot.ErrorPercentThreshold)
- assert.Equal(t, 15, configGot.RequestVolumeThreshold)
-}
-
-func TestGetConfig3(t *testing.T) {
- mockInitHystrixConfig()
- // This should use default
- configGot := getConfig("Mock.Service", "GetMock", true)
- assert.NotNil(t, configGot)
- assert.Equal(t, 1000, configGot.Timeout)
- assert.Equal(t, 600, configGot.MaxConcurrentRequests)
- assert.Equal(t, 5000, configGot.SleepWindow)
- assert.Equal(t, 5, configGot.ErrorPercentThreshold)
- assert.Equal(t, 5, configGot.RequestVolumeThreshold)
-}
-
-type testMockSuccessInvoker struct {
- base.BaseInvoker
-}
-
-func (iv *testMockSuccessInvoker) Invoke(_ context.Context, _ base.Invocation)
result.Result {
- return &result.RPCResult{
- Rest: "Success",
- Err: nil,
- }
-}
-
-type testMockFailInvoker struct {
- base.BaseInvoker
-}
-func (iv *testMockFailInvoker) Invoke(_ context.Context, _ base.Invocation)
result.Result {
- return &result.RPCResult{
- Err: errors.Errorf("exception"),
- }
+ filterGot = newFilterProvider()
+ assert.NotNil(t, filterGot)
}
-func TestHystrixFilterInvokeSuccess(t *testing.T) {
- hf := &Filter{}
- testUrl, err := common.NewURL(
- fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
- require.NoError(t, err)
- testInvoker := testMockSuccessInvoker{*base.NewBaseInvoker(testUrl)}
- invokeResult := hf.Invoke(context.Background(), &testInvoker,
&invocation.RPCInvocation{})
- assert.NotNil(t, invokeResult)
- require.NoError(t, invokeResult.Error())
- assert.NotNil(t, invokeResult.Result())
-}
+func TestHystrixFilter_Invoke(t *testing.T) {
+ // Configure hystrix command for testing
+ // Resource name format:
dubbo:consumer:InterfaceName:group:version:Method(paramTypes)
+ cmdName :=
"dubbo:consumer:com.ikurento.user.UserProvider:::TestMethod()"
+ hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{
+ Timeout: 1000,
+ MaxConcurrentRequests: 10,
+ RequestVolumeThreshold: 5,
+ SleepWindow: 1000,
+ ErrorPercentThreshold: 50,
+ })
-func TestHystrixFilterInvokeFail(t *testing.T) {
- hf := &Filter{}
- testUrl, err := common.NewURL(
- fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
+ url, err :=
common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider")
require.NoError(t, err)
- testInvoker := testMockFailInvoker{*base.NewBaseInvoker(testUrl)}
- invokeResult := hf.Invoke(context.Background(), &testInvoker,
&invocation.RPCInvocation{})
- assert.NotNil(t, invokeResult)
- require.Error(t, invokeResult.Error())
-}
-
-func TestHystrixFilterInvokeCircuitBreak(t *testing.T) {
- mockInitHystrixConfig()
- hystrix.Flush()
- hf := &Filter{COrP: true}
- resChan := make(chan result.Result, 50)
- configLoadMutex.Lock()
- defer configLoadMutex.Unlock()
- for i := 0; i < 50; i++ {
- go func() {
- testUrl, err := common.NewURL(
-
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
- assert.NoError(t, err)
- testInvoker :=
testMockSuccessInvoker{*base.NewBaseInvoker(testUrl)}
- invokeResult := hf.Invoke(context.Background(),
&testInvoker, &invocation.RPCInvocation{})
- resChan <- invokeResult
- }()
- }
- // This can not always pass the test when on travis due to concurrency,
you can uncomment the code below and test it locally
+ mockInvoker := base.NewBaseInvoker(url)
- //var lastRest bool
- //for i := 0; i < 50; i++ {
- // lastRest = (<-resChan).Error().(*FilterError).FailByHystrix()
- //}
- //Normally the last result should be true, which means the circuit has
been opened
- //
- //assert.True(t, lastRest)
-}
-
-func TestHystrixFilterInvokeCircuitBreakOmitException(t *testing.T) {
- mockInitHystrixConfig()
- hystrix.Flush()
- reg, _ := regexp.Compile(".*exception.*")
- regs := []*regexp.Regexp{reg}
- hf := &Filter{res: map[string][]*regexp.Regexp{"": regs}, COrP: true}
- resChan := make(chan result.Result, 50)
- configLoadMutex.Lock()
- defer configLoadMutex.Unlock()
- for i := 0; i < 50; i++ {
- go func() {
- testUrl, err := common.NewURL(
-
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
- assert.NoError(t, err)
- testInvoker :=
testMockSuccessInvoker{*base.NewBaseInvoker(testUrl)}
- invokeResult := hf.Invoke(context.Background(),
&testInvoker, &invocation.RPCInvocation{})
- resChan <- invokeResult
- }()
- }
- // This can not always pass the test when on travis due to concurrency,
you can uncomment the code below and test it locally
+ filter := &Filter{COrP: true}
+ mockInvocation := invocation.NewRPCInvocation("TestMethod",
[]any{"OK"}, make(map[string]any))
- //time.Sleep(time.Second * 6)
- //var lastRest bool
- //for i := 0; i < 50; i++ {
- // lastRest = (<-resChan).Error().(*FilterError).FailByHystrix()
- //}
- //
- //assert.False(t, lastRest)
+ ctx := context.Background()
+ result := filter.Invoke(ctx, mockInvoker, mockInvocation)
+ assert.NotNil(t, result)
+ assert.NoError(t, result.Error())
}
Review Comment:
The test coverage is minimal compared to the old implementation. The removed
tests covered important scenarios like circuit breaking behavior, error
handling, and concurrent requests. Consider adding tests for: 1) invocation
errors being properly handled, 2) circuit breaker opening after threshold is
reached, 3) fallback behavior when circuit is open. The Sentinel filter has
similar comprehensive tests (filter/sentinel/filter_test.go:45-92, 118-160)
that could serve as a reference.
##########
filter/hystrix/filter.go:
##########
@@ -16,30 +16,37 @@
*/
// Package hystrix provides hystrix filter.
+// To use hystrix, you need to configure commands using hystrix-go API:
+//
+// import "github.com/afex/hystrix-go/hystrix"
+//
+// // Resource name format:
dubbo:consumer:InterfaceName:group:version:Method(param1,param2)
+// // Example:
dubbo:consumer:com.example.GreetService:::Greet(string,string)
+//
hystrix.ConfigureCommand("dubbo:consumer:com.example.GreetService:::Greet(string,string)",
hystrix.CommandConfig{
+// Timeout: 1000,
+// MaxConcurrentRequests: 20,
+// RequestVolumeThreshold: 20,
+// SleepWindow: 5000,
+// ErrorPercentThreshold: 50,
+// })
Review Comment:
The documentation example shows parameter types as "string,string" but the
test shows empty parameter types with "()". The documentation should clarify
that parameter types are included only when they exist, and show a clearer
example. For instance, the test uses "TestMethod()" with no parameters, which
doesn't match the documentation example format.
##########
filter/hystrix/filter.go:
##########
@@ -87,111 +86,31 @@ func NewHystrixFilterError(err error, failByHystrix bool)
error {
}
// Filter for Hystrix
-/**
- * You should add hystrix related configuration in provider or consumer config
or both, according to which side you are to apply Filter.
- * For example:
- * filter_conf:
- * hystrix:
- * configs:
- * # =========== Define config here ============
- * "Default":
- * timeout : 1000
- * max_concurrent_requests : 25
- * sleep_window : 5000
- * error_percent_threshold : 50
- * request_volume_threshold: 20
- * "userp":
- * timeout: 2000
- * max_concurrent_requests: 512
- * sleep_window: 4000
- * error_percent_threshold: 35
- * request_volume_threshold: 6
- * "userp_m":
- * timeout : 1200
- * max_concurrent_requests : 512
- * sleep_window : 6000
- * error_percent_threshold : 60
- * request_volume_threshold: 16
- * # =========== Define error whitelist which will be ignored by Hystrix
counter ============
- * error_whitelist: [".*exception.*"]
- *
- * # =========== Apply default config here ===========
- * default: "Default"
- *
- * services:
- * "com.ikurento.user.UserProvider":
- * # =========== Apply service level config ===========
- * service_config: "userp"
- * # =========== Apply method level config ===========
- * methods:
- * "GetUser": "userp_m"
- * "GetUser1": "userp_m"
- */
type Filter struct {
- COrP bool // true for consumer
- res map[string][]*regexp.Regexp
- ifNewMap sync.Map
+ COrP bool // true for consumer, false for provider
}
// Invoke is an implementation of filter, provides Hystrix pattern latency and
fault tolerance
func (f *Filter) Invoke(ctx context.Context, invoker base.Invoker, invocation
base.Invocation) result.Result {
- cmdName := fmt.Sprintf("%s&method=%s", invoker.GetURL().Key(),
invocation.MethodName())
+ cmdName := getResourceName(invoker, invocation, f.COrP)
- // Do the configuration if the circuit breaker is created for the first
time
- if _, load := f.ifNewMap.LoadOrStore(cmdName, true); !load {
- configLoadMutex.Lock()
- filterConf := getConfig(invoker.GetURL().Service(),
invocation.MethodName(), f.COrP)
- for _, ptn := range filterConf.Error {
- reg, err := regexp.Compile(ptn)
- if err != nil {
- logger.Warnf("[Hystrix Filter]Errors occurred
parsing error omit regexp: %s, %v", ptn, err)
- } else {
- if f.res == nil {
- f.res =
make(map[string][]*regexp.Regexp)
- }
- f.res[invocation.MethodName()] =
append(f.res[invocation.MethodName()], reg)
- }
- }
- hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{
- Timeout: filterConf.Timeout,
- MaxConcurrentRequests:
filterConf.MaxConcurrentRequests,
- SleepWindow: filterConf.SleepWindow,
- ErrorPercentThreshold:
filterConf.ErrorPercentThreshold,
- RequestVolumeThreshold:
filterConf.RequestVolumeThreshold,
- })
- configLoadMutex.Unlock()
- }
- configLoadMutex.RLock()
- _, _, err := hystrix.GetCircuit(cmdName)
- configLoadMutex.RUnlock()
- if err != nil {
- logger.Errorf("[Hystrix Filter]Errors occurred getting circuit
for %s , will invoke without hystrix, error is: %+v", cmdName, err)
- return invoker.Invoke(ctx, invocation)
- }
- logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName)
var res result.Result
- _ = hystrix.Do(cmdName, func() error {
+ err := hystrix.Do(cmdName, func() error {
res = invoker.Invoke(ctx, invocation)
- err := res.Error()
- if err != nil {
- res.SetError(NewHystrixFilterError(err, false))
- for _, reg := range f.res[invocation.MethodName()] {
- if reg.MatchString(err.Error()) {
- logger.Debugf("[Hystrix Filter]Error in
invocation but omitted in circuit breaker: %v; %s", err, cmdName)
- return nil
- }
- }
- }
- return err
+ return res.Error()
}, func(err error) error {
- // Return error and if it is caused by hystrix logic, so that
it can be handled by previous filters.
+ // Circuit is open, return fallback error
_, ok := err.(hystrix.CircuitError)
- logger.Debugf("[Hystrix Filter]Hystrix health check counted,
error is: %v, failed by hystrix: %v; %s", err, ok, cmdName)
+ logger.Debugf("[Hystrix Filter] Circuit opened for %s, failed
by hystrix: %v", cmdName, ok)
res = &result.RPCResult{}
res.SetResult(nil)
res.SetError(NewHystrixFilterError(err, ok))
return err
})
Review Comment:
The fallback function creates a new RPCResult and discards the original
invocation result. When an invocation error occurs (not a circuit break), the
original result object may contain useful data beyond just the error. The old
implementation preserved the original result and only wrapped the error.
Consider modifying the fallback to preserve the original result when it exists,
or at least copy relevant fields from the original result before replacing it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]