This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push: new 53843b749 Fix service discovery subscription (#2480) 53843b749 is described below commit 53843b7495f4f2c66be9d6992b980e0b65662a18 Author: Ken Liu <ken.lj...@gmail.com> AuthorDate: Sun Nov 5 12:23:55 2023 +0800 Fix service discovery subscription (#2480) --- cluster/cluster/adaptivesvc/cluster_invoker.go | 2 - cluster/directory/base/directory.go | 14 +-- cluster/directory/directory.go | 6 +- cluster/directory/static/directory.go | 12 ++- common/extension/otel_trace.go | 4 +- common/extension/registry_directory.go | 31 ++++++- common/match.go | 5 +- config/otel_config.go | 3 + config/router_config.go | 2 +- config_center/configurator/override.go | 2 +- config_center/mock_dynamic_config.go | 5 +- go.mod | 5 +- go.sum | 13 ++- imports/imports.go | 4 +- otel/trace/exporter.go | 4 + otel/trace/jaeger/exporter.go | 9 +- otel/trace/otlp/exporter.go | 11 ++- otel/trace/stdout/exporter.go | 8 +- otel/trace/zipkin/exporter.go | 9 +- .../dubbo3/health/triple_health_v1/health.pb.go | 8 +- .../health/triple_health_v1/health_triple.pb.go | 15 ++- registry/base_configuration_listener.go | 2 +- registry/directory/directory.go | 102 +++++++++++++++++---- registry/protocol/protocol.go | 29 +----- registry/protocol/protocol_test.go | 2 + .../servicediscovery/service_discovery_registry.go | 54 ++++------- .../service_mapping_change_listener_impl.go | 2 + 27 files changed, 232 insertions(+), 131 deletions(-) diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go b/cluster/cluster/adaptivesvc/cluster_invoker.go index 7eaf456e8..4eb47e9f3 100644 --- a/cluster/cluster/adaptivesvc/cluster_invoker.go +++ b/cluster/cluster/adaptivesvc/cluster_invoker.go @@ -38,8 +38,6 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" ) -var _ protocol.Invoker = (*adaptiveServiceClusterInvoker)(nil) - type adaptiveServiceClusterInvoker struct { base.BaseClusterInvoker } diff --git a/cluster/directory/base/directory.go b/cluster/directory/base/directory.go index 93dfcb638..38812b94c 100644 --- a/cluster/directory/base/directory.go +++ b/cluster/directory/base/directory.go @@ -42,8 +42,8 @@ type Directory struct { } // NewDirectory Create BaseDirectory with URL -func NewDirectory(url *common.URL) Directory { - return Directory{ +func NewDirectory(url *common.URL) *Directory { + return &Directory{ url: url, destroyed: atomic.NewBool(false), routerChain: &chain.RouterChain{}, @@ -91,8 +91,8 @@ func (dir *Directory) isProperRouter(url *common.URL) bool { return false } -// Destroy Destroy -func (dir *Directory) Destroy(doDestroy func()) { +// DoDestroy stop directory +func (dir *Directory) DoDestroy(doDestroy func()) { if dir.destroyed.CAS(false, true) { dir.mutex.Lock() doDestroy() @@ -100,7 +100,7 @@ func (dir *Directory) Destroy(doDestroy func()) { } } -// IsAvailable Once directory init finish, it will change to true -func (dir *Directory) IsAvailable() bool { - return !dir.destroyed.Load() +// IsDestroyed Once directory init finish, it will change to true +func (dir *Directory) IsDestroyed() bool { + return dir.destroyed.Load() } diff --git a/cluster/directory/directory.go b/cluster/directory/directory.go index 257eb4a70..b28000684 100644 --- a/cluster/directory/directory.go +++ b/cluster/directory/directory.go @@ -22,8 +22,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" ) -// Directory -// Extension - Directory +// Directory implementations include RegistryDirectory, ServiceDiscoveryRegistryDirectory, StaticDirectory type Directory interface { common.Node @@ -32,4 +31,7 @@ type Directory interface { // implementation for the sake of performance consideration. This requires the caller of List() shouldn't modify // the return result directly. List(invocation protocol.Invocation) []protocol.Invoker + + // Subscribe listen to registry instances + Subscribe(url *common.URL) error } diff --git a/cluster/directory/static/directory.go b/cluster/directory/static/directory.go index 0e9ecc385..d8ed888b2 100644 --- a/cluster/directory/static/directory.go +++ b/cluster/directory/static/directory.go @@ -29,7 +29,7 @@ import ( ) type directory struct { - base.Directory + *base.Directory invokers []protocol.Invoker } @@ -51,6 +51,10 @@ func NewDirectory(invokers []protocol.Invoker) *directory { // for-loop invokers ,if all invokers is available ,then it means directory is available func (dir *directory) IsAvailable() bool { + if dir.Directory.IsDestroyed() { + return false + } + if len(dir.invokers) == 0 { return false } @@ -78,7 +82,7 @@ func (dir *directory) List(invocation protocol.Invocation) []protocol.Invoker { // Destroy Destroy func (dir *directory) Destroy() { - dir.Directory.Destroy(func() { + dir.Directory.DoDestroy(func() { for _, ivk := range dir.invokers { ivk.Destroy() } @@ -99,3 +103,7 @@ func (dir *directory) BuildRouterChain(invokers []protocol.Invoker) error { dir.SetRouterChain(routerChain) return nil } + +func (dir *directory) Subscribe(url *common.URL) error { + panic("Static directory does not support subscribing to registry.") +} diff --git a/common/extension/otel_trace.go b/common/extension/otel_trace.go index f8023d7a0..cee4cea80 100644 --- a/common/extension/otel_trace.go +++ b/common/extension/otel_trace.go @@ -22,11 +22,11 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/otel/trace" + "github.com/dubbogo/gost/log/logger" ) import ( - "github.com/dubbogo/gost/log/logger" + "dubbo.apache.org/dubbo-go/v3/otel/trace" ) var traceExporterMap = make(map[string]func(config *trace.ExporterConfig) (trace.Exporter, error), 4) diff --git a/common/extension/registry_directory.go b/common/extension/registry_directory.go index ae2fef3af..32309dea6 100644 --- a/common/extension/registry_directory.go +++ b/common/extension/registry_directory.go @@ -17,6 +17,10 @@ package extension +import ( + "github.com/dubbogo/gost/log/logger" +) + import ( "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common" @@ -25,17 +29,36 @@ import ( type registryDirectory func(url *common.URL, registry registry.Registry) (directory.Directory, error) -var defaultRegistry registryDirectory +var directories = make(map[string]registryDirectory) +var defaultDirectory registryDirectory // SetDefaultRegistryDirectory sets the default registryDirectory func SetDefaultRegistryDirectory(v registryDirectory) { - defaultRegistry = v + defaultDirectory = v +} + +// SetDirectory sets the default registryDirectory +func SetDirectory(key string, v registryDirectory) { + directories[key] = v } // GetDefaultRegistryDirectory finds the registryDirectory with url and registry func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (directory.Directory, error) { - if defaultRegistry == nil { + if defaultDirectory == nil { panic("registry directory is not existing, make sure you have import the package.") } - return defaultRegistry(config, registry) + return defaultDirectory(config, registry) +} + +// GetDirectoryInstance finds the registryDirectory with url and registry +func GetDirectoryInstance(config *common.URL, registry registry.Registry) (directory.Directory, error) { + key := config.Protocol + if key == "" { + return GetDefaultRegistryDirectory(config, registry) + } + if directories[key] == nil { + logger.Warn("registry directory " + key + " does not exist, make sure you have import the package, will use the default directory type.") + return GetDefaultRegistryDirectory(config, registry) + } + return directories[key](config, registry) } diff --git a/common/match.go b/common/match.go index 08099e07f..2d9897d40 100644 --- a/common/match.go +++ b/common/match.go @@ -18,13 +18,16 @@ package common import ( - "dubbo.apache.org/dubbo-go/v3/common/constant" "fmt" "net" "regexp" "strings" ) +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" +) + type ParamMatch struct { Key string `yaml:"key" json:"key,omitempty" property:"key"` Value StringMatch `yaml:"value" json:"value,omitempty" property:"value"` diff --git a/config/otel_config.go b/config/otel_config.go index 6c819fffd..762a0897f 100644 --- a/config/otel_config.go +++ b/config/otel_config.go @@ -19,8 +19,11 @@ package config import ( "github.com/creasty/defaults" + "github.com/dubbogo/gost/log/logger" + "github.com/pkg/errors" + "go.opentelemetry.io/otel" ) diff --git a/config/router_config.go b/config/router_config.go index a68339a4a..50d65f3a6 100644 --- a/config/router_config.go +++ b/config/router_config.go @@ -18,12 +18,12 @@ package config import ( - "dubbo.apache.org/dubbo-go/v3/common" "github.com/creasty/defaults" ) import ( _ "dubbo.apache.org/dubbo-go/v3/cluster/router/chain" + "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" _ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus" ) diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go index f74d1c86e..fb1f0d616 100644 --- a/config_center/configurator/override.go +++ b/config_center/configurator/override.go @@ -18,7 +18,6 @@ package configurator import ( - "dubbo.apache.org/dubbo-go/v3/config_center/parser" "strings" ) @@ -31,6 +30,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/config_center/parser" ) func init() { diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 9561abf41..76ffeea9c 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -36,7 +36,8 @@ import ( // MockDynamicConfigurationFactory defines content type MockDynamicConfigurationFactory struct { - Content string + Content string + ConfiguratorContent string } const ( @@ -82,6 +83,8 @@ func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(_ *common.URL) }) if len(f.Content) != 0 { dynamicConfiguration.content = f.Content + } else if len(f.ConfiguratorContent) != 0 { + dynamicConfiguration.content = f.ConfiguratorContent } return dynamicConfiguration, err } diff --git a/go.mod b/go.mod index 6c534697d..afa1845bd 100644 --- a/go.mod +++ b/go.mod @@ -39,11 +39,13 @@ require ( github.com/influxdata/tdigest v0.0.1 github.com/jinzhu/copier v0.3.5 github.com/knadh/koanf v1.5.0 - github.com/magiconair/properties v1.8.1 + github.com/kr/pretty v0.3.0 // indirect + github.com/magiconair/properties v1.8.5 github.com/mattn/go-colorable v0.1.13 github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd + github.com/nacos-group/nacos-sdk-go v1.0.9 // indirect github.com/nacos-group/nacos-sdk-go/v2 v2.2.2 github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 github.com/opentracing/opentracing-go v1.2.0 @@ -52,6 +54,7 @@ require ( github.com/polarismesh/polaris-go v1.3.0 github.com/prometheus/client_golang v1.13.0 github.com/prometheus/common v0.37.0 + github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.3 go.etcd.io/etcd/api/v3 v3.5.7 diff --git a/go.sum b/go.sum index 8a4f0a63b..965544e02 100644 --- a/go.sum +++ b/go.sum @@ -927,8 +927,9 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -945,8 +946,9 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2 github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= -github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= +github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -1002,8 +1004,9 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nacos-group/nacos-sdk-go v1.0.8 h1:8pEm05Cdav9sQgJSv5kyvlgfz0SzFUUGI3pWX6SiSnM= github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= +github.com/nacos-group/nacos-sdk-go v1.0.9 h1:sMvrp6tZj4LdhuHRsS4GCqASB81k3pjmT2ykDQQpwt0= +github.com/nacos-group/nacos-sdk-go v1.0.9/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nacos-group/nacos-sdk-go/v2 v2.1.2/go.mod h1:ys/1adWeKXXzbNWfRNbaFlX/t6HVLWdpsNDvmoWTw0g= github.com/nacos-group/nacos-sdk-go/v2 v2.2.2 h1:FI+7vr1fvCA4jbgx36KezmP3zlU/WoP/7wAloaSd1Ew= github.com/nacos-group/nacos-sdk-go/v2 v2.2.2/go.mod h1:ys/1adWeKXXzbNWfRNbaFlX/t6HVLWdpsNDvmoWTw0g= @@ -1062,6 +1065,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -1133,6 +1137,9 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/rwtodd/Go.Sed v0.0.0-20210816025313-55464686f9ef/go.mod h1:8AEUvGVi2uQ5b24BIhcr0GCcpd/RNAFWaN2CJFrWIIQ= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= diff --git a/imports/imports.go b/imports/imports.go index 90737e9bb..63f5541b1 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -67,9 +67,8 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/metrics/app_info" _ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus" _ "dubbo.apache.org/dubbo-go/v3/otel/trace/jaeger" - _ "dubbo.apache.org/dubbo-go/v3/otel/trace/zipkin" - //_ "dubbo.apache.org/dubbo-go/v3/otel/trace/otlp" // FIXME: otlp-grpc use grpc health check which has the namespace conflict with dubbo3 protocol _ "dubbo.apache.org/dubbo-go/v3/otel/trace/stdout" + _ "dubbo.apache.org/dubbo-go/v3/otel/trace/zipkin" _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo" _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3" _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health" @@ -78,6 +77,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/protocol/jsonrpc" _ "dubbo.apache.org/dubbo-go/v3/protocol/rest" _ "dubbo.apache.org/dubbo-go/v3/proxy/proxy_factory" + _ "dubbo.apache.org/dubbo-go/v3/registry/directory" _ "dubbo.apache.org/dubbo-go/v3/registry/etcdv3" _ "dubbo.apache.org/dubbo-go/v3/registry/nacos" _ "dubbo.apache.org/dubbo-go/v3/registry/polaris" diff --git a/otel/trace/exporter.go b/otel/trace/exporter.go index 454b02a97..74770f7b0 100644 --- a/otel/trace/exporter.go +++ b/otel/trace/exporter.go @@ -24,10 +24,14 @@ import ( import ( "github.com/dubbogo/gost/log/logger" + "go.opentelemetry.io/contrib/propagators/b3" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.7.0" ) diff --git a/otel/trace/jaeger/exporter.go b/otel/trace/jaeger/exporter.go index 65a269022..c2969111c 100644 --- a/otel/trace/jaeger/exporter.go +++ b/otel/trace/jaeger/exporter.go @@ -22,13 +22,14 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/otel/trace" + "go.opentelemetry.io/otel/exporters/jaeger" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) import ( - "go.opentelemetry.io/otel/exporters/jaeger" - sdktrace "go.opentelemetry.io/otel/sdk/trace" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/otel/trace" ) var ( diff --git a/otel/trace/otlp/exporter.go b/otel/trace/otlp/exporter.go index 69e2adeb1..3c0c10542 100644 --- a/otel/trace/otlp/exporter.go +++ b/otel/trace/otlp/exporter.go @@ -22,18 +22,19 @@ import ( "sync" ) -import ( - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/otel/trace" -) - import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) +import ( + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/otel/trace" +) + var ( initHttpOnce sync.Once httpInstance *Exporter diff --git a/otel/trace/stdout/exporter.go b/otel/trace/stdout/exporter.go index 8024c4d89..7bfdd5c45 100644 --- a/otel/trace/stdout/exporter.go +++ b/otel/trace/stdout/exporter.go @@ -22,14 +22,16 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/otel/trace" - "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) +import ( + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/otel/trace" +) + var ( initOnce sync.Once instance *Exporter diff --git a/otel/trace/zipkin/exporter.go b/otel/trace/zipkin/exporter.go index 4068872ce..f70fc67b2 100644 --- a/otel/trace/zipkin/exporter.go +++ b/otel/trace/zipkin/exporter.go @@ -22,13 +22,14 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/otel/trace" + "go.opentelemetry.io/otel/exporters/zipkin" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) import ( - "go.opentelemetry.io/otel/exporters/zipkin" - sdktrace "go.opentelemetry.io/otel/sdk/trace" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/otel/trace" ) var ( diff --git a/protocol/dubbo3/health/triple_health_v1/health.pb.go b/protocol/dubbo3/health/triple_health_v1/health.pb.go index 499740042..5eaa416d7 100644 --- a/protocol/dubbo3/health/triple_health_v1/health.pb.go +++ b/protocol/dubbo3/health/triple_health_v1/health.pb.go @@ -27,12 +27,16 @@ package triple_health_v1 import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" ) +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + const ( // Verify that this generated code is sufficiently up-to-date. _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) diff --git a/protocol/dubbo3/health/triple_health_v1/health_triple.pb.go b/protocol/dubbo3/health/triple_health_v1/health_triple.pb.go index 00633ccfa..d177190d8 100644 --- a/protocol/dubbo3/health/triple_health_v1/health_triple.pb.go +++ b/protocol/dubbo3/health/triple_health_v1/health_triple.pb.go @@ -25,20 +25,27 @@ package triple_health_v1 import ( context "context" - constant1 "dubbo.apache.org/dubbo-go/v3/common/constant" - protocol "dubbo.apache.org/dubbo-go/v3/protocol" - dubbo3 "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3" - invocation "dubbo.apache.org/dubbo-go/v3/protocol/invocation" fmt "fmt" +) + +import ( grpc_go "github.com/dubbogo/grpc-go" codes "github.com/dubbogo/grpc-go/codes" metadata "github.com/dubbogo/grpc-go/metadata" status "github.com/dubbogo/grpc-go/status" + common "github.com/dubbogo/triple/pkg/common" constant "github.com/dubbogo/triple/pkg/common/constant" triple "github.com/dubbogo/triple/pkg/triple" ) +import ( + constant1 "dubbo.apache.org/dubbo-go/v3/common/constant" + protocol "dubbo.apache.org/dubbo-go/v3/protocol" + dubbo3 "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3" + invocation "dubbo.apache.org/dubbo-go/v3/protocol/invocation" +) + // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc_go.SupportPackageIsVersion7 diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index bbca23a3c..9c13c6ff4 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -55,7 +55,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente } bcl.defaultConfiguratorFunc = f bcl.dynamicConfiguration.AddListener(key, listener) - if rawConfig, err := bcl.dynamicConfiguration.GetInternalProperty(key, + if rawConfig, err := bcl.dynamicConfiguration.GetRule(key, config_center.WithGroup(constant.Dubbo)); err != nil { //set configurators to empty bcl.configurators = []config_center.Configurator{} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 962bb8fb6..27cab6223 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -52,12 +52,14 @@ import ( func init() { extension.SetDefaultRegistryDirectory(NewRegistryDirectory) + extension.SetDirectory(constant.RegistryProtocol, NewRegistryDirectory) + extension.SetDirectory(constant.ServiceRegistryProtocol, NewServiceDiscoveryRegistryDirectory) } // RegistryDirectory implementation of Directory: // Invoker list returned from this Directory's list method have been filtered by Routers type RegistryDirectory struct { - base.Directory + *base.Directory cacheInvokers []protocol.Invoker invokersLock sync.RWMutex serviceType string @@ -107,12 +109,24 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (director } // subscribe from registry -func (dir *RegistryDirectory) Subscribe(url *common.URL) { - logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key()) - dir.SubscribedUrl = url - if err := dir.registry.Subscribe(url, dir); err != nil { - logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err) - } +func (dir *RegistryDirectory) Subscribe(url *common.URL) error { + logger.Infof("Start subscribing for service :%s with a new go routine.", url.Key()) + + go func() { + dir.SubscribedUrl = url + if err := dir.registry.Subscribe(url, dir); err != nil { + logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err) + } + + urlToReg := getConsumerUrlToRegistry(url) + err := dir.registry.Register(urlToReg) + if err != nil { + logger.Errorf("consumer service %v register registry %v error, error message is %s", + url.String(), dir.registry.GetURL().String(), err.Error()) + } + }() + + return nil } // Notify monitor changes from registry,and update the cacheServices @@ -431,8 +445,8 @@ func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.In // IsAvailable whether the directory is available func (dir *RegistryDirectory) IsAvailable() bool { - if !dir.Directory.IsAvailable() { - return dir.Directory.IsAvailable() + if dir.Directory.IsDestroyed() { + return false } for _, ivk := range dir.cacheInvokers { @@ -447,15 +461,19 @@ func (dir *RegistryDirectory) IsAvailable() bool { // Destroy method func (dir *RegistryDirectory) Destroy() { // TODO:unregister & unsubscribe - dir.Directory.Destroy(func() { - err := dir.registry.UnRegister(dir.RegisteredUrl) - if err != nil { - logger.Warnf("Unregister consumer url failed, %s", dir.RegisteredUrl.String(), err) + dir.Directory.DoDestroy(func() { + if dir.RegisteredUrl != nil { + err := dir.registry.UnRegister(dir.RegisteredUrl) + if err != nil { + logger.Warnf("Unregister consumer url failed, %s", dir.RegisteredUrl.String(), err) + } } - err = dir.registry.UnSubscribe(dir.SubscribedUrl, dir) - if err != nil { - logger.Warnf("Unsubscribe consumer url failed, %s", dir.RegisteredUrl.String(), err) + if dir.SubscribedUrl != nil { + err := dir.registry.UnSubscribe(dir.SubscribedUrl, dir) + if err != nil { + logger.Warnf("Unsubscribe consumer url failed, %s", dir.RegisteredUrl.String(), err) + } } invokers := dir.cacheInvokers @@ -547,3 +565,55 @@ func (l *consumerConfigurationListener) Process(event *config_center.ConfigChang // FIXME: this doesn't trigger dir.overrideUrl() l.directory.refreshInvokers(nil) } + +// ServiceDiscoveryRegistryDirectory implementation of Directory: +// Invoker list returned from this Directory's list method have been filtered by Routers +type ServiceDiscoveryRegistryDirectory struct { + *base.Directory + *RegistryDirectory +} + +// NewServiceDiscoveryRegistryDirectory will create a new ServiceDiscoveryRegistryDirectory +func NewServiceDiscoveryRegistryDirectory(url *common.URL, registry registry.Registry) (directory.Directory, error) { + dic, err := NewRegistryDirectory(url, registry) + registryDirectory, _ := dic.(*RegistryDirectory) + return &ServiceDiscoveryRegistryDirectory{ + Directory: registryDirectory.Directory, + RegistryDirectory: registryDirectory, + }, err +} + +// Subscribe do subscribe from registry +func (dir *ServiceDiscoveryRegistryDirectory) Subscribe(url *common.URL) error { + if err := dir.registry.Subscribe(url, dir); err != nil { + logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err) + return err + } + + urlToReg := getConsumerUrlToRegistry(url) + err := dir.RegistryDirectory.registry.Register(urlToReg) + if err != nil { + logger.Errorf("consumer service %v register registry %v error, error message is %s", + url.String(), dir.registry.GetURL().String(), err.Error()) + return err + } + return nil +} + +// List selected protocol invokers from the directory +func (dir *ServiceDiscoveryRegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { + return dir.RegistryDirectory.List(invocation) +} + +func getConsumerUrlToRegistry(url *common.URL) *common.URL { + // if developer define registry port and ip, use it first. + if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 { + url.Ip = ipToRegistry + } else { + url.Ip = common.GetLocalIp() + } + if portToRegistry := os.Getenv(constant.DubboPortToRegistryKey); len(portToRegistry) > 0 { + url.Port = portToRegistry + } + return url +} diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index b34fed8cc..eeb287048 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -19,7 +19,6 @@ package protocol import ( "context" - "os" "strings" "sync" "time" @@ -43,7 +42,6 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health" "dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper" "dubbo.apache.org/dubbo-go/v3/registry" - "dubbo.apache.org/dubbo-go/v3/registry/directory" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -150,22 +148,16 @@ func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker { reg := proto.getRegistry(url) // new registry directory for store service url from registry - dic, err := extension.GetDefaultRegistryDirectory(registryUrl, reg) + dic, err := extension.GetDirectoryInstance(registryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) return nil } - // TODO, refactor to avoid type conversion - regDic, ok := dic.(*directory.RegistryDirectory) - if !ok { - logger.Errorf("Directory %v is expected to implement Directory, and will return nil invoker!", dic) - return nil - } - go regDic.Subscribe(registryUrl.SubURL) - regDic.RegisteredUrl = getConsumerUrlToRegistry(serviceUrl) - err = reg.Register(regDic.RegisteredUrl) + // This will start a new routine and listen to instance changes. + err = dic.Subscribe(registryUrl.SubURL) + if err != nil { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) @@ -559,16 +551,3 @@ func (listener *serviceConfigurationListener) Process(event *config_center.Confi listener.BaseConfigurationListener.Process(event) listener.overrideListener.doOverrideIfNecessary() } - -func getConsumerUrlToRegistry(url *common.URL) *common.URL { - // if developer define registry port and ip, use it first. - if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 { - url.Ip = ipToRegistry - } else { - url.Ip = common.GetLocalIp() - } - if portToRegistry := os.Getenv(constant.DubboPortToRegistryKey); len(portToRegistry) > 0 { - url.Port = portToRegistry - } - return url -} diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 2c9ff6133..4bc555bf9 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -40,6 +40,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper" "dubbo.apache.org/dubbo-go/v3/registry" + "dubbo.apache.org/dubbo-go/v3/registry/directory" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -55,6 +56,7 @@ func referNormal(t *testing.T, regProtocol *registryProtocol) { extension.SetRegistry("mock", registry.NewMockRegistry) extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) extension.SetCluster("mock", cluster.NewMockCluster) + extension.SetDirectory("mock", directory.NewRegistryDirectory) url, _ := common.NewURL("mock://127.0.0.1:1111") suburl, _ := common.NewURL( diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 61a56539f..6618544cd 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -46,7 +46,6 @@ import ( "dubbo.apache.org/dubbo-go/v3/registry" _ "dubbo.apache.org/dubbo-go/v3/registry/event" "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer" - "dubbo.apache.org/dubbo-go/v3/remoting" ) func init() { @@ -206,6 +205,7 @@ func shouldRegister(url *common.URL) bool { func (s *ServiceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error { if !shouldSubscribe(url) { + logger.Infof("Service %s is set to not subscribe to instances.", url.ServiceKey()) return nil } _, err := s.metaDataService.SubscribeURL(url) @@ -219,6 +219,7 @@ func (s *ServiceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.No return perrors.Errorf("Should has at least one way to know which services this interface belongs to,"+ " either specify 'provided-by' for reference or enable metadata-report center subscription url:%s", url.String()) } + logger.Infof("Find initial mapping applications %q for service %s.", services, url.ServiceKey()) // first notify mappingListener.OnEvent(registry.NewServiceMappingChangedEvent(url.ServiceKey(), services)) return nil @@ -239,7 +240,7 @@ func (s *ServiceDiscoveryRegistry) SubscribeURL(url *common.URL, notify registry for _, serviceNameTmp := range services.Values() { serviceName := serviceNameTmp.(string) instances := s.serviceDiscovery.GetInstances(serviceName) - logger.Infof("Synchronized instance notification on subscription, instance list size for application %s is %d", serviceName, len(instances)) + logger.Infof("Synchronized instance notification on application %s subscription, instance list size %s", serviceName, len(instances)) err = listener.OnEvent(®istry.ServiceInstancesChangedEvent{ ServiceName: serviceName, Instances: instances, @@ -252,46 +253,23 @@ func (s *ServiceDiscoveryRegistry) SubscribeURL(url *common.URL, notify registry s.serviceListeners[serviceNamesKey] = listener listener.AddListenerAndNotify(protocolServiceKey, notify) event := metricMetadata.NewMetadataMetricTimeEvent(metricMetadata.SubscribeServiceRt) - err = s.serviceDiscovery.AddListener(listener) - event.Succ = err != nil - event.End = time.Now() - event.Attachment[constant.InterfaceKey] = url.Interface() - metrics.Publish(event) - metrics.Publish(metricsRegistry.NewServerSubscribeEvent(err == nil)) - if err != nil { - logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error()) - } + + logger.Infof("Start subscribing to registry for applications :%s with a new go routine.", serviceNamesKey) + go func() { + err = s.serviceDiscovery.AddListener(listener) + event.Succ = err != nil + event.End = time.Now() + event.Attachment[constant.InterfaceKey] = url.Interface() + metrics.Publish(event) + metrics.Publish(metricsRegistry.NewServerSubscribeEvent(err == nil)) + if err != nil { + logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error()) + } + }() } // LoadSubscribeInstances load subscribe instance func (s *ServiceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error { - appName := url.GetParam(constant.ApplicationKey, url.Username) - instances := s.serviceDiscovery.GetInstances(appName) - for _, instance := range instances { - if instance.GetMetadata() == nil { - logger.Warnf("Instance metadata is nil: %s", instance.GetHost()) - continue - } - revision, ok := instance.GetMetadata()[constant.ExportedServicesRevisionPropertyName] - if !ok { - logger.Warnf("Instance metadata revision is nil: %s", instance.GetHost()) - continue - } - if "0" == revision { - logger.Infof("Find instance without valid service metadata: %s", instance.GetHost()) - continue - } - metadataInfo, err := GetMetadataInfo(url.GetParam(constant.ApplicationKey, ""), instance, revision) - if err != nil { - return err - } - instance.SetServiceMetadata(metadataInfo) - for _, serviceInfo := range metadataInfo.Services { - for _, url := range instance.ToURLs(serviceInfo) { - notify.Notify(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: url}) - } - } - } return nil } diff --git a/registry/servicediscovery/service_mapping_change_listener_impl.go b/registry/servicediscovery/service_mapping_change_listener_impl.go index 7f7c3cddb..2f64f6973 100644 --- a/registry/servicediscovery/service_mapping_change_listener_impl.go +++ b/registry/servicediscovery/service_mapping_change_listener_impl.go @@ -24,6 +24,7 @@ import ( import ( gxset "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/gof/observer" + "github.com/dubbogo/gost/log/logger" ) import ( @@ -94,6 +95,7 @@ func (lstn *ServiceMappingChangedListenerImpl) OnEvent(e observer.Event) error { } for _, service := range newServiceNames.Values() { if !oldServiceNames.Contains(service) { + logger.Infof("Service-application mapping changed for service: %s, new applications: %q, old applications: %q.", lstn.serviceUrl.ServiceKey(), oldServiceNames, newServiceNames) lstn.mappingCache.Delete(oldServiceNames.String()) lstn.mappingCache.Store(newServiceNames.String(), newServiceNames) if reg, err = extension.GetRegistry(lstn.registryUrl.Protocol, lstn.registryUrl); err != nil {