This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push: new b16c55794 add: implement dynamic routing (#2303) b16c55794 is described below commit b16c5579403722e46bd810a31c0e213ef44e0bf7 Author: finalt <finalt1...@163.com> AuthorDate: Tue May 30 20:33:01 2023 +0800 add: implement dynamic routing (#2303) --- cluster/router/condition/dynamic_router.go | 205 ++++++++++++ cluster/router/condition/factory.go | 56 ++++ cluster/router/condition/matcher/attachment.go | 70 +++++ cluster/router/condition/matcher/base.go | 4 +- cluster/router/condition/matcher/factory.go | 27 +- .../condition/matcher/pattern_value/scope.go | 126 ++++++++ .../condition/matcher/pattern_value/wildcard.go | 5 +- cluster/router/condition/route.go | 32 +- cluster/router/condition/router_test.go | 348 ++++++++++++++++++++- common/constant/key.go | 3 + common/extension/router_condition_matcher.go | 37 +++ common/extension/router_condition_pattern_value.go | 37 +++ imports/imports.go | 1 + 13 files changed, 919 insertions(+), 32 deletions(-) diff --git a/cluster/router/condition/dynamic_router.go b/cluster/router/condition/dynamic_router.go new file mode 100644 index 000000000..9d613dfe4 --- /dev/null +++ b/cluster/router/condition/dynamic_router.go @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "strconv" + "strings" + "sync" +) + +import ( + "github.com/dubbogo/gost/log/logger" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + conf "dubbo.apache.org/dubbo-go/v3/common/config" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/config" + "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/remoting" +) + +type DynamicRouter struct { + conditionRouters []*StateRouter + routerConfig *config.RouterConfig +} + +func (d *DynamicRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if len(invokers) == 0 || len(d.conditionRouters) == 0 { + return invokers + } + + for _, router := range d.conditionRouters { + invokers = router.Route(invokers, url, invocation) + } + return invokers +} + +func (d *DynamicRouter) URL() *common.URL { + return nil +} + +func (d *DynamicRouter) Process(event *config_center.ConfigChangeEvent) { + if event.ConfigType == remoting.EventTypeDel { + d.routerConfig = nil + d.conditionRouters = make([]*StateRouter, 0) + } else { + routerConfig, err := parseRoute(event.Value.(string)) + if err != nil { + logger.Warnf("[condition router]Build a new condition route config error, %+v and we will use the original condition rule configuration.", err) + return + } + d.routerConfig = routerConfig + conditions, err := generateConditions(d.routerConfig) + if err != nil { + logger.Warnf("[condition router]Build a new condition route config error, %+v and we will use the original condition rule configuration.", err) + return + } + d.conditionRouters = conditions + } +} + +func generateConditions(routerConfig *config.RouterConfig) ([]*StateRouter, error) { + if routerConfig == nil { + return make([]*StateRouter, 0), nil + } + conditionRouters := make([]*StateRouter, 0, len(routerConfig.Conditions)) + for _, conditionRule := range routerConfig.Conditions { + url, err := common.NewURL("condition://") + if err != nil { + return nil, err + } + url.AddParam(constant.RuleKey, conditionRule) + url.AddParam(constant.ForceKey, strconv.FormatBool(routerConfig.Force)) + url.AddParam(constant.EnabledKey, strconv.FormatBool(routerConfig.Enabled)) + conditionRoute, err := NewConditionStateRouter(url) + if err != nil { + return nil, err + } + conditionRouters = append(conditionRouters, conditionRoute) + } + return conditionRouters, nil +} + +// ServiceRouter is Service level router +type ServiceRouter struct { + DynamicRouter +} + +func NewServiceRouter() *ServiceRouter { + return &ServiceRouter{} +} + +func (s *ServiceRouter) Priority() int64 { + return 140 +} + +func (s *ServiceRouter) Notify(invokers []protocol.Invoker) { + if len(invokers) == 0 { + return + } + url := invokers[0].GetURL() + if url == nil { + logger.Error("Failed to notify a dynamically condition rule, because url is empty") + return + } + + dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + logger.Warnf("config center does not start, please check if the configuration center has been properly configured in dubbogo.yml") + return + } + key := strings.Join([]string{strings.Join([]string{url.Service(), url.GetParam(constant.VersionKey, ""), url.GetParam(constant.GroupKey, "")}, ":"), + constant.ConditionRouterRuleSuffix}, "") + dynamicConfiguration.AddListener(key, s) + value, err := dynamicConfiguration.GetRule(key) + if err != nil { + logger.Errorf("Failed to query condition rule, key=%s, err=%v", key, err) + return + } + s.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeAdd}) +} + +// ApplicationRouter is Application level router +type ApplicationRouter struct { + DynamicRouter + application string + currentApplication string + mu sync.Mutex +} + +func NewApplicationRouter() *ApplicationRouter { + applicationName := config.GetApplicationConfig().Name + a := &ApplicationRouter{ + currentApplication: applicationName, + } + + dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration != nil { + dynamicConfiguration.AddListener(strings.Join([]string{applicationName, constant.ConditionRouterRuleSuffix}, ""), a) + } + return a +} + +func (a *ApplicationRouter) Priority() int64 { + return 145 +} + +func (a *ApplicationRouter) Notify(invokers []protocol.Invoker) { + if len(invokers) == 0 { + return + } + url := invokers[0].GetURL() + if url == nil { + logger.Error("Failed to notify a dynamically condition rule, because url is empty") + return + } + providerApplicaton := url.GetParam("application", "") + if providerApplicaton == "" || providerApplicaton == a.currentApplication { + logger.Warn("condition router get providerApplication is empty, will not subscribe to provider app rules.") + return + } + + a.mu.Lock() + defer a.mu.Unlock() + + if providerApplicaton != a.application { + dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + logger.Warnf("config center does not start, please check if the configuration center has been properly configured in dubbogo.yml") + return + } + + if a.application != "" { + dynamicConfiguration.RemoveListener(strings.Join([]string{a.application, constant.ConditionRouterRuleSuffix}, ""), a) + } + + key := strings.Join([]string{providerApplicaton, constant.ConditionRouterRuleSuffix}, "") + dynamicConfiguration.AddListener(key, a) + a.application = providerApplicaton + value, err := dynamicConfiguration.GetRule(key) + if err != nil { + logger.Errorf("Failed to query condition rule, key=%s, err=%v", key, err) + return + } + a.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeUpdate}) + } +} diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go new file mode 100644 index 000000000..ecd7b5a35 --- /dev/null +++ b/cluster/router/condition/factory.go @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/router" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.ConditionServiceRouterFactoryKey, NewServiceConditionRouterFactory) + extension.SetRouterFactory(constant.ConditionAppRouterFactoryKey, NewAppConditionRouterFactory) +} + +// ServiceRouteFactory router factory +type ServiceRouteFactory struct{} + +// NewServiceConditionRouterFactory constructs a new PriorityRouterFactory +func NewServiceConditionRouterFactory() router.PriorityRouterFactory { + return &ServiceRouteFactory{} +} + +// NewPriorityRouter constructs a new ServiceRouter +func (s *ServiceRouteFactory) NewPriorityRouter() (router.PriorityRouter, error) { + return NewServiceRouter(), nil +} + +// AppConditionRouterFactory router factory +type AppConditionRouterFactory struct { +} + +// NewAppConditionRouterFactory router factory +func NewAppConditionRouterFactory() router.PriorityRouterFactory { + return &AppConditionRouterFactory{} +} + +// NewPriorityRouter constructs a new ApplicationRouter +func (a *AppConditionRouterFactory) NewPriorityRouter() (router.PriorityRouter, error) { + return NewApplicationRouter(), nil +} diff --git a/cluster/router/condition/matcher/attachment.go b/cluster/router/condition/matcher/attachment.go new file mode 100644 index 000000000..338fed29e --- /dev/null +++ b/cluster/router/condition/matcher/attachment.go @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package matcher + +import ( + "regexp" + "strings" +) + +import ( + "github.com/dubbogo/gost/log/logger" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +var ( + attachmentPattern = regexp.MustCompile("attachments\\[([a-zA-Z0-9_]+)\\]") + notFoundAttachmentValue = "dubbo internal not found attachment condition value" +) + +// AttachmentConditionMatcher analysis the attachments in the rule. +// Examples would be like this: +// "attachments[version]=1.0.0&attachments[timeout]=1000", whenCondition is that the version is equal to '1.0.0' and the timeout is equal to '1000'. +type AttachmentConditionMatcher struct { + BaseConditionMatcher +} + +func NewAttachmentConditionMatcher(key string) *AttachmentConditionMatcher { + return &AttachmentConditionMatcher{ + *NewBaseConditionMatcher(key), + } +} + +func (a *AttachmentConditionMatcher) GetValue(sample map[string]string, url *common.URL, invocation protocol.Invocation) string { + // split the rule + expressArray := strings.Split(a.key, "\\.") + attachmentExpress := expressArray[0] + matcher := attachmentPattern.FindStringSubmatch(attachmentExpress) + if len(matcher) == 0 { + logger.Warn(notFoundAttachmentValue) + return "" + } + // extract the attachment key + attachmentKey := matcher[1] + if attachmentKey == "" { + logger.Warn(notFoundAttachmentValue) + return "" + } + // extract the attachment value + attachment, _ := invocation.GetAttachment(attachmentKey) + return attachment +} diff --git a/cluster/router/condition/matcher/base.go b/cluster/router/condition/matcher/base.go index f3858070b..b8df5ef51 100644 --- a/cluster/router/condition/matcher/base.go +++ b/cluster/router/condition/matcher/base.go @@ -77,7 +77,7 @@ func (b *BaseConditionMatcher) IsMatch(value string, param *common.URL, invocati if len(b.matches) != 0 && len(b.misMatches) != 0 { // when both mismatches and matches contain the same value, then using mismatches first - return b.patternMatches(value, param, invocation, isWhenCondition) && b.patternMisMatches(value, param, invocation, isWhenCondition) + return b.patternMisMatches(value, param, invocation, isWhenCondition) && b.patternMatches(value, param, invocation, isWhenCondition) } return false } @@ -120,7 +120,7 @@ func doPatternMatch(pattern string, value string, url *common.URL, invocation pr // If no value matcher is available, will force to use wildcard value matcher logger.Error("Executing condition rule value match expression error, will force to use wildcard value matcher") - valuePattern := pattern_value.GetValuePattern("wildcard") + valuePattern := pattern_value.GetValuePattern(constant.Wildcard) return valuePattern.Match(pattern, value, url, invocation, isWhenCondition) } diff --git a/cluster/router/condition/matcher/factory.go b/cluster/router/condition/matcher/factory.go index f5cc89476..6b6711e53 100644 --- a/cluster/router/condition/matcher/factory.go +++ b/cluster/router/condition/matcher/factory.go @@ -27,8 +27,9 @@ import ( ) func init() { - SetMatcherFactory("argument", NewArgumentMatcherFactory) - SetMatcherFactory("param", NewParamMatcherFactory) + SetMatcherFactory(constant.Arguments, NewArgumentMatcherFactory) + SetMatcherFactory(constant.Attachments, NewAttachmentMatcherFactory) + SetMatcherFactory(constant.Param, NewParamMatcherFactory) } // ArgumentMatcherFactory matcher factory @@ -53,6 +54,28 @@ func (a *ArgumentMatcherFactory) Priority() int64 { return 300 } +// AttachmentMatcherFactory matcher factory +type AttachmentMatcherFactory struct { +} + +// NewAttachmentMatcherFactory constructs a new attachment.AttachmentMatcherFactory +func NewAttachmentMatcherFactory() ConditionMatcherFactory { + return &AttachmentMatcherFactory{} +} + +func (a *AttachmentMatcherFactory) ShouldMatch(key string) bool { + return strings.HasPrefix(key, constant.Attachments) +} + +// NewMatcher constructs a new matcher +func (a *AttachmentMatcherFactory) NewMatcher(key string) Matcher { + return NewAttachmentConditionMatcher(key) +} + +func (a *AttachmentMatcherFactory) Priority() int64 { + return 200 +} + // ParamMatcherFactory matcher factory type ParamMatcherFactory struct { } diff --git a/cluster/router/condition/matcher/pattern_value/scope.go b/cluster/router/condition/matcher/pattern_value/scope.go new file mode 100644 index 000000000..ca445811d --- /dev/null +++ b/cluster/router/condition/matcher/pattern_value/scope.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pattern_value + +import ( + "strconv" + "strings" +) + +import ( + "github.com/dubbogo/gost/log/logger" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +// ScopeValuePattern matches with patterns like 'key=1~100', 'key=~100' or 'key=1~' +type ScopeValuePattern struct { +} + +func init() { + SetValuePattern(constant.Scope, NewScopeValuePattern) +} + +func NewScopeValuePattern() ValuePattern { + return &ScopeValuePattern{} +} + +func (s *ScopeValuePattern) Priority() int64 { + return 100 +} + +func (s *ScopeValuePattern) ShouldMatch(pattern string) bool { + return strings.Contains(pattern, "~") +} + +func (s *ScopeValuePattern) Match(pattern string, value string, url *common.URL, invocation protocol.Invocation, isWhenCondition bool) bool { + defaultValue := !isWhenCondition + intValue, err := strconv.Atoi(value) + if err != nil { + logError(pattern, value) + return defaultValue + } + + arr := strings.Split(pattern, "~") + if len(arr) < 2 { + logError(pattern, value) + return defaultValue + } + + rawStart := arr[0] + rawEnd := arr[1] + + if rawStart == "" && rawEnd == "" { + return defaultValue + } + + return s.matchRange(intValue, rawStart, rawEnd, defaultValue, pattern, value) +} + +func (s *ScopeValuePattern) matchRange(intValue int, rawStart, rawEnd string, defaultValue bool, pattern, value string) bool { + if rawStart == "" { + end, err := strconv.Atoi(rawEnd) + if err != nil { + logError(pattern, value) + return defaultValue + } + if intValue > end { + return false + } + } else if rawEnd == "" { + start, err := strconv.Atoi(rawStart) + if err != nil { + logError(pattern, value) + return defaultValue + } + if intValue < start { + return false + } + } else { + start, end, err := convertToIntRange(rawStart, rawEnd) + if err != nil { + logError(pattern, value) + return defaultValue + } + if intValue < start || intValue > end { + return false + } + } + + return true +} + +func convertToIntRange(rawStart, rawEnd string) (int, int, error) { + start, err := strconv.Atoi(rawStart) + if err != nil { + return 0, 0, err + } + end, err := strconv.Atoi(rawEnd) + if err != nil { + return 0, 0, err + } + return start, end, nil +} + +func logError(pattern, value string) { + logger.Errorf("Parse integer error, Invalid condition rule '%s' or value '%s', will ignore.", pattern, value) +} diff --git a/cluster/router/condition/matcher/pattern_value/wildcard.go b/cluster/router/condition/matcher/pattern_value/wildcard.go index dff572107..7e73610c5 100644 --- a/cluster/router/condition/matcher/pattern_value/wildcard.go +++ b/cluster/router/condition/matcher/pattern_value/wildcard.go @@ -24,6 +24,7 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/protocol" ) @@ -33,10 +34,10 @@ type WildcardValuePattern struct { } func init() { - SetValuePattern("wildcard", NewValuePattern) + SetValuePattern(constant.Wildcard, NewWildcardValuePattern) } -func NewValuePattern() ValuePattern { +func NewWildcardValuePattern() ValuePattern { return &WildcardValuePattern{} } diff --git a/cluster/router/condition/route.go b/cluster/router/condition/route.go index ea046020d..26f2c2578 100644 --- a/cluster/router/condition/route.go +++ b/cluster/router/condition/route.go @@ -85,8 +85,8 @@ func NewConditionStateRouter(url *common.URL) (*StateRouter, error) { } // Route Determine the target invokers list. -func (c *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - if !c.enable { +func (s *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if !s.enable { return invokers } @@ -94,25 +94,25 @@ func (c *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invoca return invokers } - if !c.matchWhen(url, invocation) { + if !s.matchWhen(url, invocation) { return invokers } - if len(c.thenCondition) == 0 { + if len(s.thenCondition) == 0 { logger.Warn("condition state router thenCondition is empty") return []protocol.Invoker{} } var result = make([]protocol.Invoker, 0, len(invokers)) for _, invoker := range invokers { - if c.matchThen(invoker.GetURL(), url) { + if s.matchThen(invoker.GetURL(), url) { result = append(result, invoker) } } if len(result) != 0 { return result - } else if c.force { + } else if s.force { logger.Warn("execute condition state router result list is empty. and force=true") return result } @@ -120,26 +120,26 @@ func (c *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invoca return invokers } -func (c *StateRouter) URL() *common.URL { - return c.url +func (s *StateRouter) URL() *common.URL { + return s.url } -func (c *StateRouter) Priority() int64 { +func (s *StateRouter) Priority() int64 { return 0 } -func (c *StateRouter) matchWhen(url *common.URL, invocation protocol.Invocation) bool { - if len(c.whenCondition) == 0 { +func (s *StateRouter) matchWhen(url *common.URL, invocation protocol.Invocation) bool { + if len(s.whenCondition) == 0 { return true } - return doMatch(url, nil, invocation, c.whenCondition, true) + return doMatch(url, nil, invocation, s.whenCondition, true) } -func (c *StateRouter) matchThen(url *common.URL, param *common.URL) bool { - if len(c.thenCondition) == 0 { +func (s *StateRouter) matchThen(url *common.URL, param *common.URL) bool { + if len(s.thenCondition) == 0 { return false } - return doMatch(url, param, nil, c.thenCondition, false) + return doMatch(url, param, nil, s.thenCondition, false) } func generateMatcher(url *common.URL) (when, then map[string]matcher.Matcher, err error) { @@ -284,7 +284,7 @@ func getMatcher(key string) matcher.Matcher { return factory.NewMatcher(key) } } - return matcher.GetMatcherFactory("param").NewMatcher(key) + return matcher.GetMatcherFactory(constant.Param).NewMatcher(key) } func doMatch(url *common.URL, param *common.URL, invocation protocol.Invocation, conditions map[string]matcher.Matcher, isWhenCondition bool) bool { diff --git a/cluster/router/condition/router_test.go b/cluster/router/condition/router_test.go index 6b264e033..451306bc0 100644 --- a/cluster/router/condition/router_test.go +++ b/cluster/router/condition/router_test.go @@ -27,7 +27,11 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/config" "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/config_center/configurator" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" ) @@ -40,11 +44,15 @@ const ( localProviderAddr = "dubbo://127.0.0.1:20880/com.foo.BarService" remoteProviderAddr = "dubbo://dubbo.apache.org:20880/com.foo.BarService" + emptyProviderAddr = "" + + region = "?region=hangzhou" + method = "getFoo" ) func TestRouteMatchWhen(t *testing.T) { - rpcInvocation := invocation.NewRPCInvocation("getFoo", nil, nil) + rpcInvocation := invocation.NewRPCInvocation(method, nil, nil) whenConsumerURL, _ := common.NewURL(remoteConsumerAddr) testData := []struct { @@ -133,7 +141,7 @@ func TestRouteMatchFilter(t *testing.T) { url2, _ := common.NewURL(localProviderAddr) url3, _ := common.NewURL(localProviderAddr) - rpcInvocation := invocation.NewRPCInvocation("getFoo", nil, nil) + rpcInvocation := invocation.NewRPCInvocation(method, nil, nil) ink1 := protocol.NewBaseInvoker(url1) ink2 := protocol.NewBaseInvoker(url2) @@ -214,7 +222,7 @@ func TestRouteMatchFilter(t *testing.T) { func TestRouterMethodRoute(t *testing.T) { - rpcInvocation := invocation.NewRPCInvocation("getFoo", nil, nil) + rpcInvocation := invocation.NewRPCInvocation(method, nil, nil) testData := []struct { name string @@ -262,7 +270,7 @@ func TestRouterMethodRoute(t *testing.T) { func TestRouteReturn(t *testing.T) { - rpcInvocation := invocation.NewRPCInvocation("getFoo", nil, nil) + rpcInvocation := invocation.NewRPCInvocation(method, nil, nil) consumerURL, _ := common.NewURL(localConsumerAddr) testData := []struct { @@ -276,9 +284,9 @@ func TestRouteReturn(t *testing.T) { { name: "ReturnFalse", urls: []string{ - "", - "", - "", + emptyProviderAddr, + emptyProviderAddr, + emptyProviderAddr, }, rule: "host = 127.0.0.1 => false", @@ -288,9 +296,9 @@ func TestRouteReturn(t *testing.T) { { name: "ReturnEmpty", urls: []string{ - "", - "", - "", + emptyProviderAddr, + emptyProviderAddr, + emptyProviderAddr, }, rule: "host = 127.0.0.1 => ", @@ -477,3 +485,323 @@ func TestRouteArguments(t *testing.T) { }) } } + +// TestRouteAttachments also tests matcher.AttachmentConditionMatcher's GetValue method +func TestRouteAttachments(t *testing.T) { + consumerURL, _ := common.NewURL(localConsumerAddr) + + url1, _ := common.NewURL(remoteProviderAddr + region) + url2, _ := common.NewURL(localProviderAddr) + url3, _ := common.NewURL(localProviderAddr) + + ink1 := protocol.NewBaseInvoker(url1) + ink2 := protocol.NewBaseInvoker(url2) + ink3 := protocol.NewBaseInvoker(url3) + + invokerList := make([]protocol.Invoker, 0, 3) + invokerList = append(invokerList, ink1) + invokerList = append(invokerList, ink2) + invokerList = append(invokerList, ink3) + + testData := []struct { + name string + attachmentKey string + attachmentValue string + rule string + + wantVal int + }{ + { + name: "Empty attachments", + attachmentKey: "", + attachmentValue: "", + rule: "attachments[foo] = a => host = 1.2.3.4", + + wantVal: 3, + }, + { + name: "Yes attachments and no host", + attachmentKey: "foo", + attachmentValue: "a", + rule: "attachments[foo] = a => host = 1.2.3.4", + + wantVal: 0, + }, + { + name: "No attachments and no host", + attachmentKey: "foo", + attachmentValue: "a", + rule: "attachments = a => host = 1.2.3.4", + + wantVal: 3, + }, + { + name: "Yes attachments and region", + attachmentKey: "foo", + attachmentValue: "a", + rule: "attachments[foo] = a => region = hangzhou", + + wantVal: 1, + }, + } + + for _, data := range testData { + t.Run(data.name, func(t *testing.T) { + + rpcInvocation := invocation.NewRPCInvocation(method, nil, nil) + rpcInvocation.SetAttachment(data.attachmentKey, data.attachmentValue) + + url, err := common.NewURL(conditionAddr) + assert.Nil(t, err) + url.AddParam(constant.RuleKey, data.rule) + url.AddParam(constant.ForceKey, "true") + router, err := NewConditionStateRouter(url) + assert.Nil(t, err) + + filterInvokers := router.Route(invokerList, consumerURL, rpcInvocation) + + resVal := len(filterInvokers) + assert.Equal(t, data.wantVal, resVal) + }) + } +} + +// TestRouteRangePattern also tests pattern_value.ScopeValuePattern's Match method +func TestRouteRangePattern(t *testing.T) { + + consumerURL, _ := common.NewURL(localConsumerAddr) + + url1, _ := common.NewURL(remoteProviderAddr + region) + url2, _ := common.NewURL(localProviderAddr) + url3, _ := common.NewURL(localProviderAddr) + + ink1 := protocol.NewBaseInvoker(url1) + ink2 := protocol.NewBaseInvoker(url2) + ink3 := protocol.NewBaseInvoker(url3) + + invokerList := make([]protocol.Invoker, 0, 3) + invokerList = append(invokerList, ink1) + invokerList = append(invokerList, ink2) + invokerList = append(invokerList, ink3) + + testData := []struct { + name string + attachmentKey string + attachmentValue string + rule string + + wantVal int + }{ + { + name: "Empty attachment", + attachmentKey: "", + attachmentValue: "", + rule: "attachments[user_id] = 1~100 => region=hangzhou", + + wantVal: 3, + }, + { + name: "In the range", + attachmentKey: "user_id", + attachmentValue: "80", + rule: "attachments[user_id] = 1~100 => region=hangzhou", + + wantVal: 1, + }, + { + name: "Out of range", + attachmentKey: "user_id", + attachmentValue: "101", + rule: "attachments[user_id] = 1~100 => region=hangzhou", + + wantVal: 3, + }, + { + name: "In the single interval range", + attachmentKey: "user_id", + attachmentValue: "1", + rule: "attachments[user_id] = ~100 => region=hangzhou", + + wantVal: 1, + }, + { + name: "Not in the single interval range", + attachmentKey: "user_id", + attachmentValue: "101", + rule: "attachments[user_id] = ~100 => region=hangzhou", + + wantVal: 3, + }, + } + + for _, data := range testData { + t.Run(data.name, func(t *testing.T) { + + rpcInvocation := invocation.NewRPCInvocation(method, nil, nil) + rpcInvocation.SetAttachment(data.attachmentKey, data.attachmentValue) + + url, err := common.NewURL(conditionAddr) + assert.Nil(t, err) + url.AddParam(constant.RuleKey, data.rule) + url.AddParam(constant.ForceKey, "true") + router, err := NewConditionStateRouter(url) + assert.Nil(t, err) + + filterInvokers := router.Route(invokerList, consumerURL, rpcInvocation) + + resVal := len(filterInvokers) + assert.Equal(t, data.wantVal, resVal) + }) + } +} + +func TestRouteMultipleConditions(t *testing.T) { + url1, _ := common.NewURL(remoteProviderAddr + region) + url2, _ := common.NewURL(localProviderAddr) + url3, _ := common.NewURL(localProviderAddr) + + ink1 := protocol.NewBaseInvoker(url1) + ink2 := protocol.NewBaseInvoker(url2) + ink3 := protocol.NewBaseInvoker(url3) + + invokerList := make([]protocol.Invoker, 0, 3) + invokerList = append(invokerList, ink1) + invokerList = append(invokerList, ink2) + invokerList = append(invokerList, ink3) + + testData := []struct { + name string + argument string + consumerURL string + rule string + + wantVal int + }{ + { + name: "All conditions match", + argument: "a", + consumerURL: localConsumerAddr + "?application=consumer_app", + rule: "application=consumer_app&arguments[0]=a => host = 127.0.0.1", + + wantVal: 2, + }, + { + name: "One of the conditions does not match", + argument: "a", + consumerURL: localConsumerAddr + "?application=another_consumer_app", + rule: "application=consumer_app&arguments[0]=a => host = 127.0.0.1", + + wantVal: 3, + }, + } + for _, data := range testData { + t.Run(data.name, func(t *testing.T) { + consumerUrl, err := common.NewURL(data.consumerURL) + assert.Nil(t, err) + + url, err := common.NewURL(conditionAddr) + assert.Nil(t, err) + url.AddParam(constant.RuleKey, data.rule) + url.AddParam(constant.ForceKey, "true") + router, err := NewConditionStateRouter(url) + assert.Nil(t, err) + + arguments := make([]interface{}, 0, 1) + arguments = append(arguments, data.argument) + + rpcInvocation := invocation.NewRPCInvocation(method, arguments, nil) + + filterInvokers := router.Route(invokerList, consumerUrl, rpcInvocation) + resVal := len(filterInvokers) + assert.Equal(t, data.wantVal, resVal) + }) + } +} + +func TestServiceRouter(t *testing.T) { + + consumerURL, _ := common.NewURL(remoteConsumerAddr) + + url1, _ := common.NewURL(remoteProviderAddr) + url2, _ := common.NewURL(remoteProviderAddr + region) + url3, _ := common.NewURL(localProviderAddr) + + ink1 := protocol.NewBaseInvoker(url1) + ink2 := protocol.NewBaseInvoker(url2) + ink3 := protocol.NewBaseInvoker(url3) + + invokerList := make([]protocol.Invoker, 0, 3) + invokerList = append(invokerList, ink1) + invokerList = append(invokerList, ink2) + invokerList = append(invokerList, ink3) + + extension.SetDefaultConfigurator(configurator.NewMockConfigurator) + ccURL, _ := common.NewURL("mock://127.0.0.1:1111") + mockFactory := &config_center.MockDynamicConfigurationFactory{ + Content: ` +scope: service +force: true +enabled: true +runtime: true +key: com.foo.BarService +conditions: + - 'method=sayHello => region=hangzhou'`, + } + dc, _ := mockFactory.GetDynamicConfiguration(ccURL) + config.GetEnvInstance().SetDynamicConfiguration(dc) + + router := NewServiceRouter() + router.Notify(invokerList) + + rpcInvocation := invocation.NewRPCInvocation("sayHello", nil, nil) + invokers := router.Route(invokerList, consumerURL, rpcInvocation) + assert.Equal(t, 1, len(invokers)) + + rpcInvocation = invocation.NewRPCInvocation("sayHi", nil, nil) + invokers = router.Route(invokerList, consumerURL, rpcInvocation) + assert.Equal(t, 3, len(invokers)) +} + +func TestApplicationRouter(t *testing.T) { + + consumerURL, _ := common.NewURL(remoteConsumerAddr) + + url1, _ := common.NewURL(remoteProviderAddr + "?application=demo-provider") + url2, _ := common.NewURL(localProviderAddr + "?application=demo-provider®ion=hangzhou") + url3, _ := common.NewURL(localProviderAddr + "?application=demo-provider") + + ink1 := protocol.NewBaseInvoker(url1) + ink2 := protocol.NewBaseInvoker(url2) + ink3 := protocol.NewBaseInvoker(url3) + + invokerList := make([]protocol.Invoker, 0, 3) + invokerList = append(invokerList, ink1) + invokerList = append(invokerList, ink2) + invokerList = append(invokerList, ink3) + + extension.SetDefaultConfigurator(configurator.NewMockConfigurator) + ccURL, _ := common.NewURL("mock://127.0.0.1:1111") + mockFactory := &config_center.MockDynamicConfigurationFactory{ + Content: ` +scope: application +force: true +enabled: true +runtime: true +key: demo-provider +conditions: + - 'method=sayHello => region=hangzhou'`, + } + dc, _ := mockFactory.GetDynamicConfiguration(ccURL) + config.GetEnvInstance().SetDynamicConfiguration(dc) + + router := NewApplicationRouter() + router.Notify(invokerList) + + rpcInvocation := invocation.NewRPCInvocation("sayHello", nil, nil) + invokers := router.Route(invokerList, consumerURL, rpcInvocation) + assert.Equal(t, 1, len(invokers)) + + rpcInvocation = invocation.NewRPCInvocation("sayHi", nil, nil) + invokers = router.Route(invokerList, consumerURL, rpcInvocation) + assert.Equal(t, 3, len(invokers)) +} diff --git a/common/constant/key.go b/common/constant/key.go index 16933a408..51d8753b3 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -313,6 +313,9 @@ const ( ForceKey = "force" Arguments = "arguments" Attachments = "attachments" + Param = "param" + Scope = "scope" + Wildcard = "wildcard" MeshRouterFactoryKey = "mesh" ) diff --git a/common/extension/router_condition_matcher.go b/common/extension/router_condition_matcher.go new file mode 100644 index 000000000..8c0c3725f --- /dev/null +++ b/common/extension/router_condition_matcher.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/router/condition/matcher" +) + +// SetMatcherFactory sets create matcherFactory function with @name +func SetMatcherFactory(name string, fun func() matcher.ConditionMatcherFactory) { + matcher.SetMatcherFactory(name, fun) +} + +// GetMatcherFactory gets create matcherFactory function by name +func GetMatcherFactory(name string) matcher.ConditionMatcherFactory { + return matcher.GetMatcherFactory(name) +} + +// GetMatcherFactories gets all create matcherFactory function +func GetMatcherFactories() map[string]func() matcher.ConditionMatcherFactory { + return matcher.GetMatcherFactories() +} diff --git a/common/extension/router_condition_pattern_value.go b/common/extension/router_condition_pattern_value.go new file mode 100644 index 000000000..ef1618a64 --- /dev/null +++ b/common/extension/router_condition_pattern_value.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/router/condition/matcher/pattern_value" +) + +// SetValuePattern sets create valuePattern function with @name +func SetValuePattern(name string, fun func() pattern_value.ValuePattern) { + pattern_value.SetValuePattern(name, fun) +} + +// GetValuePattern gets create valuePattern function by name +func GetValuePattern(name string) pattern_value.ValuePattern { + return pattern_value.GetValuePattern(name) +} + +// GetValuePatterns gets all create valuePattern function +func GetValuePatterns() map[string]func() pattern_value.ValuePattern { + return pattern_value.GetValuePatterns() +} diff --git a/imports/imports.go b/imports/imports.go index 277e74cee..c5868969b 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -32,6 +32,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin" + _ "dubbo.apache.org/dubbo-go/v3/cluster/router/condition" _ "dubbo.apache.org/dubbo-go/v3/cluster/router/meshrouter" _ "dubbo.apache.org/dubbo-go/v3/cluster/router/polaris" _ "dubbo.apache.org/dubbo-go/v3/cluster/router/tag"