This is an automated email from the ASF dual-hosted git repository. tianxiaoliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new 35b4680 Refactor: add self registry service (#1190) 35b4680 is described below commit 35b46800a60df832170c10964fca48d897fc052c Author: little-cui <sure_0...@qq.com> AuthorDate: Wed Dec 29 09:49:04 2021 +0800 Refactor: add self registry service (#1190) --- datasource/engine.go | 2 - datasource/engine_test.go | 98 --------------- datasource/etcd/engine.go | 139 --------------------- datasource/mongo/engine.go | 138 -------------------- server/api_server.go | 9 +- server/datacache/cache.go | 8 -- .../service/registry/registry.go | 70 +++-------- 7 files changed, 18 insertions(+), 446 deletions(-) diff --git a/datasource/engine.go b/datasource/engine.go index d2c3265..1c2d6d5 100644 --- a/datasource/engine.go +++ b/datasource/engine.go @@ -25,8 +25,6 @@ import ( // SCManager contains the APIs of registration of SC itself type SCManager interface { - SelfRegister(ctx context.Context) error - SelfUnregister(ctx context.Context) error UpgradeVersion(ctx context.Context) error GetClusters(ctx context.Context) (etcdadpt.Clusters, error) } diff --git a/datasource/engine_test.go b/datasource/engine_test.go deleted file mode 100644 index cc3a048..0000000 --- a/datasource/engine_test.go +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package datasource_test - -import ( - "context" - "fmt" - - "github.com/apache/servicecomb-service-center/datasource/etcd/path" - - "github.com/apache/servicecomb-service-center/pkg/util" - apt "github.com/apache/servicecomb-service-center/server/core" - pb "github.com/go-chassis/cari/discovery" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -// map[domainProject][serviceName]*serviceCleanInfo -var svcCleanInfos = make(map[string]map[string]*serviceCleanInfo) - -type serviceCleanInfo struct { - ServiceName string - ServiceId string - WithInstance bool - ShouldClear bool -} - -func getContextWith(domain string, project string) context.Context { - return util.WithNoCache(util.SetDomainProject(context.Background(), domain, project)) -} - -func createService(domain string, project string, name string, withInstance bool, shouldClear bool) { - By(fmt.Sprintf("create service: %s, with instance: %t, should clear: %t", name, withInstance, shouldClear)) - svc := &pb.CreateServiceRequest{ - Service: &pb.MicroService{ - AppId: "clear", - ServiceName: name, - Version: "1.0", - }, - } - if withInstance { - svc.Instances = []*pb.MicroServiceInstance{ - { - Endpoints: []string{"http://127.0.0.1:80"}, - HostName: "1", - }, - } - } - ctx := getContextWith(domain, project) - svcResp, err := apt.ServiceAPI.Create(ctx, svc) - Expect(err).To(BeNil()) - Expect(svcResp).NotTo(BeNil()) - Expect(svcResp.Response.GetCode()).To(Equal(pb.ResponseSuccess)) - info := &serviceCleanInfo{ - ServiceName: name, - ServiceId: svcResp.ServiceId, - WithInstance: withInstance, - ShouldClear: shouldClear, - } - domainProject := domain + path.SPLIT + project - m, ok := svcCleanInfos[domainProject] - if !ok { - m = make(map[string]*serviceCleanInfo) - svcCleanInfos[domainProject] = m - } - m[name] = info -} - -func checkServiceCleared(domain string, project string) { - domainProject := domain + path.SPLIT + project - m := svcCleanInfos[domainProject] - for _, v := range m { - By(fmt.Sprintf("check cleared, service: %s, should be cleared: %t", v.ServiceName, v.ShouldClear)) - getSvcReq := &pb.GetServiceRequest{ - ServiceId: v.ServiceId, - } - ctx := getContextWith(domain, project) - getSvcResp, err := apt.ServiceAPI.GetOne(ctx, getSvcReq) - Expect(err).To(BeNil()) - Expect(getSvcResp).NotTo(BeNil()) - Expect(getSvcResp.Response.GetCode() == pb.ResponseSuccess).To(Equal(!v.ShouldClear)) - } -} diff --git a/datasource/etcd/engine.go b/datasource/etcd/engine.go index 3666aa3..c9345bb 100644 --- a/datasource/etcd/engine.go +++ b/datasource/etcd/engine.go @@ -20,158 +20,19 @@ package etcd import ( "context" "encoding/json" - "errors" - "fmt" "os" - "time" - "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/datasource/etcd/mux" "github.com/apache/servicecomb-service-center/datasource/etcd/path" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/server/config" - "github.com/apache/servicecomb-service-center/server/core" - discosvc "github.com/apache/servicecomb-service-center/server/service/disco" "github.com/apache/servicecomb-service-center/version" - pb "github.com/go-chassis/cari/discovery" - "github.com/go-chassis/foundation/gopool" "github.com/little-cui/etcdadpt" ) type SCManager struct { } -func (sm *SCManager) SelfRegister(ctx context.Context) error { - err := sm.selfRegister(ctx) - if err != nil { - return err - } - // start send heart beat job - sm.autoSelfHeartBeat() - return nil -} - -func (sm *SCManager) selfRegister(pCtx context.Context) error { - ctx := core.AddDefaultContextValue(pCtx) - err := sm.registerService(ctx) - if err != nil { - return err - } - // 实例信息 - return sm.registerInstance(ctx) -} - -func (sm *SCManager) registerService(ctx context.Context) error { - respE, err := core.ServiceAPI.Exist(ctx, core.GetExistenceRequest()) - if err != nil { - log.Error("query service center existence failed", err) - return err - } - if respE.Response.GetCode() == pb.ResponseSuccess { - log.Warn(fmt.Sprintf("service center service[%s] already registered", respE.ServiceId)) - respG, err := core.ServiceAPI.GetOne(ctx, core.GetServiceRequest(respE.ServiceId)) - if respG.Response.GetCode() != pb.ResponseSuccess { - log.Error(fmt.Sprintf("query service center service[%s] info failed", respE.ServiceId), err) - return datasource.ErrServiceNotExists - } - core.Service = respG.Service - return nil - } - - respS, err := core.ServiceAPI.Create(ctx, core.CreateServiceRequest()) - if err != nil { - log.Error("register service center failed", err) - return err - } - if respS.Response.GetCode() != pb.ResponseSuccess { - log.Error("register service center failed, msg: "+respS.Response.GetMessage(), nil) - return errors.New(respS.Response.GetMessage()) - } - core.Service.ServiceId = respS.ServiceId - log.Info(fmt.Sprintf("register service center service[%s]", respS.ServiceId)) - return nil -} - -func (sm *SCManager) registerInstance(ctx context.Context) error { - core.Instance.InstanceId = "" - core.Instance.ServiceId = core.Service.ServiceId - respI, err := discosvc.RegisterInstance(ctx, core.RegisterInstanceRequest()) - if err != nil { - log.Error("register failed", err) - return err - } - if respI.Response.GetCode() != pb.ResponseSuccess { - log.Error(fmt.Sprintf("register service center[%s] instance failed, %s", - core.Instance.ServiceId, respI.Response.GetMessage()), nil) - return errors.New(respI.Response.GetMessage()) - } - core.Instance.InstanceId = respI.InstanceId - log.Info(fmt.Sprintf("register service center instance[%s/%s], endpoints is %s", - core.Service.ServiceId, respI.InstanceId, core.Instance.Endpoints)) - return nil -} - -func (sm *SCManager) selfHeartBeat(pCtx context.Context) error { - ctx := core.AddDefaultContextValue(pCtx) - respI, err := discosvc.Heartbeat(ctx, core.HeartbeatRequest()) - if err != nil { - log.Error("send heartbeat failed", err) - return err - } - if respI.Response.GetCode() == pb.ResponseSuccess { - log.Debug(fmt.Sprintf("update service center instance[%s/%s] heartbeat", - core.Instance.ServiceId, core.Instance.InstanceId)) - return nil - } - err = fmt.Errorf(respI.Response.GetMessage()) - log.Error(fmt.Sprintf("update service center instance[%s/%s] heartbeat failed", - core.Instance.ServiceId, core.Instance.InstanceId), err) - return err -} - -func (sm *SCManager) autoSelfHeartBeat() { - gopool.Go(func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second): - err := sm.selfHeartBeat(ctx) - if err == nil { - continue - } - //服务不存在,创建服务 - err = sm.selfRegister(ctx) - if err != nil { - log.Error(fmt.Sprintf("retry to register[%s/%s/%s/%s] failed", - core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version), err) - } - } - } - }) -} - -func (sm *SCManager) SelfUnregister(pCtx context.Context) error { - if len(core.Instance.InstanceId) == 0 { - return nil - } - ctx := core.AddDefaultContextValue(pCtx) - respI, err := discosvc.UnregisterInstance(ctx, core.UnregisterInstanceRequest()) - if err != nil { - log.Error("unregister failed", err) - return err - } - if respI.Response.GetCode() != pb.ResponseSuccess { - err = fmt.Errorf("unregister service center instance[%s/%s] failed, %s", - core.Instance.ServiceId, core.Instance.InstanceId, respI.Response.GetMessage()) - log.Error(err.Error(), nil) - return err - } - log.Warn(fmt.Sprintf("unregister service center instance[%s/%s]", - core.Service.ServiceId, core.Instance.InstanceId)) - return nil -} - func (sm *SCManager) GetClusters(ctx context.Context) (etcdadpt.Clusters, error) { return etcdadpt.ListCluster(ctx) } diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go index a5eb730..7be13da 100644 --- a/datasource/mongo/engine.go +++ b/datasource/mongo/engine.go @@ -19,60 +19,13 @@ package mongo import ( "context" - "fmt" - "time" - "github.com/apache/servicecomb-service-center/datasource" - mutil "github.com/apache/servicecomb-service-center/datasource/mongo/util" - "github.com/apache/servicecomb-service-center/pkg/log" - "github.com/apache/servicecomb-service-center/server/core" - "github.com/apache/servicecomb-service-center/server/metrics" - discosvc "github.com/apache/servicecomb-service-center/server/service/disco" - pb "github.com/go-chassis/cari/discovery" - "github.com/go-chassis/foundation/gopool" "github.com/little-cui/etcdadpt" ) type SCManager struct { } -func (ds *SCManager) SelfRegister(ctx context.Context) error { - err := ds.registryService(ctx) - if err != nil { - return err - } - - // 实例信息 - err = ds.registryInstance(ctx) - - // wait heartbeat - ds.autoSelfHeartBeat() - - metrics.ReportScInstance() - return err -} -func (ds *SCManager) SelfUnregister(ctx context.Context) error { - if len(core.Instance.InstanceId) == 0 { - return nil - } - - ctx = core.AddDefaultContextValue(ctx) - respI, err := datasource.GetMetadataManager().UnregisterInstance(ctx, core.UnregisterInstanceRequest()) - if err != nil { - log.Error("unregister failed", err) - return err - } - if respI.Response.GetCode() != pb.ResponseSuccess { - err = fmt.Errorf("unregister service center instance[%s/%s] failed, %s", - core.Instance.ServiceId, core.Instance.InstanceId, respI.Response.GetMessage()) - log.Error(err.Error(), nil) - return err - } - log.Warn(fmt.Sprintf("unregister service center instance[%s/%s]", - core.Service.ServiceId, core.Instance.InstanceId)) - return nil -} - func (ds *SCManager) UpgradeVersion(ctx context.Context) error { return nil } @@ -80,94 +33,3 @@ func (ds *SCManager) UpgradeVersion(ctx context.Context) error { func (ds *SCManager) GetClusters(ctx context.Context) (etcdadpt.Clusters, error) { return nil, nil } - -func (ds *SCManager) registryService(pCtx context.Context) error { - ctx := core.AddDefaultContextValue(pCtx) - respE, err := datasource.GetMetadataManager().ExistService(ctx, core.GetExistenceRequest()) - if err != nil { - log.Error("query service center existence failed", err) - return err - } - if respE.Response.GetCode() == pb.ResponseSuccess { - log.Warn(fmt.Sprintf("service center service[%s] already registered", respE.ServiceId)) - service, err := datasource.GetMetadataManager().GetService(ctx, core.GetServiceRequest(respE.ServiceId)) - if err != nil { - log.Error(fmt.Sprintf("query service center service[%s] info failed", respE.ServiceId), err) - return mutil.ErrLostServiceFile - } - core.Service = service - return nil - } - - respS, err := datasource.GetMetadataManager().RegisterService(ctx, core.CreateServiceRequest()) - if err != nil { - log.Error("register service center failed", err) - return err - } - core.Service.ServiceId = respS.ServiceId - log.Info(fmt.Sprintf("register service center service[%s]", respS.ServiceId)) - return nil -} - -func (ds *SCManager) registryInstance(pCtx context.Context) error { - core.Instance.InstanceId = "" - core.Instance.ServiceId = core.Service.ServiceId - - ctx := core.AddDefaultContextValue(pCtx) - - respI, err := datasource.GetMetadataManager().RegisterInstance(ctx, core.RegisterInstanceRequest()) - if err != nil { - log.Error("register failed", err) - return err - } - if respI.Response.GetCode() != pb.ResponseSuccess { - err = fmt.Errorf("register service center[%s] instance failed, %s", - core.Instance.ServiceId, respI.Response.GetMessage()) - log.Error(err.Error(), nil) - return err - } - core.Instance.InstanceId = respI.InstanceId - log.Info(fmt.Sprintf("register service center instance[%s/%s], endpoints is %s", - core.Service.ServiceId, respI.InstanceId, core.Instance.Endpoints)) - return nil -} - -func (ds *SCManager) selfHeartBeat(pCtx context.Context) error { - ctx := core.AddDefaultContextValue(pCtx) - respI, err := discosvc.Heartbeat(ctx, core.HeartbeatRequest()) - if err != nil { - log.Error("send heartbeat failed", err) - return err - } - if respI.Response.GetCode() == pb.ResponseSuccess { - log.Debug(fmt.Sprintf("update service center instance[%s/%s] heartbeat", - core.Instance.ServiceId, core.Instance.InstanceId)) - return nil - } - err = fmt.Errorf(respI.Response.GetMessage()) - log.Error(fmt.Sprintf("update service center instance[%s/%s] heartbeat failed", - core.Instance.ServiceId, core.Instance.InstanceId), err) - return err -} - -func (ds *SCManager) autoSelfHeartBeat() { - gopool.Go(func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second): - err := ds.selfHeartBeat(ctx) - if err == nil { - continue - } - //服务不存在,创建服务 - err = ds.SelfRegister(ctx) - if err != nil { - log.Error(fmt.Sprintf("retry to register[%s/%s/%s/%s] failed", - core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version), err) - } - } - } - }) -} diff --git a/server/api_server.go b/server/api_server.go index 235532e..61c3ed6 100644 --- a/server/api_server.go +++ b/server/api_server.go @@ -23,9 +23,6 @@ import ( "net" "time" - "github.com/apache/servicecomb-service-center/server/service/disco" - - "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/pkg/grace" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/rest" @@ -33,6 +30,8 @@ import ( "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/metrics" rs "github.com/apache/servicecomb-service-center/server/rest" + "github.com/apache/servicecomb-service-center/server/service/disco" + "github.com/apache/servicecomb-service-center/server/service/registry" "github.com/go-chassis/foundation/gopool" ) @@ -165,7 +164,7 @@ func (s *APIServer) Stop() { } func (s *APIServer) selfRegister() { - err := datasource.GetSCManager().SelfRegister(context.Background()) + err := registry.SelfRegister(context.Background()) if err != nil { s.err <- err return @@ -177,7 +176,7 @@ func (s *APIServer) selfRegister() { func (s *APIServer) selfUnregister() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - if err := datasource.GetSCManager().SelfUnregister(ctx); err != nil { + if err := registry.SelfUnregister(ctx); err != nil { log.Error("stop registry engine failed", err) } } diff --git a/server/datacache/cache.go b/server/datacache/cache.go deleted file mode 100644 index 72e9f92..0000000 --- a/server/datacache/cache.go +++ /dev/null @@ -1,8 +0,0 @@ -package datacache - -import "context" - -type DataCache interface { - Get(ctx context.Context, name string) (interface{}, error) - List(ctx context.Context) ([]interface{}, error) -} diff --git a/datasource/etcd/engine.go b/server/service/registry/registry.go similarity index 73% copy from datasource/etcd/engine.go copy to server/service/registry/registry.go index 3666aa3..e2e04c8 100644 --- a/datasource/etcd/engine.go +++ b/server/service/registry/registry.go @@ -15,53 +15,43 @@ * limitations under the License. */ -package etcd +package registry import ( "context" - "encoding/json" "errors" "fmt" - "os" "time" "github.com/apache/servicecomb-service-center/datasource" - "github.com/apache/servicecomb-service-center/datasource/etcd/mux" - "github.com/apache/servicecomb-service-center/datasource/etcd/path" "github.com/apache/servicecomb-service-center/pkg/log" - "github.com/apache/servicecomb-service-center/server/config" "github.com/apache/servicecomb-service-center/server/core" discosvc "github.com/apache/servicecomb-service-center/server/service/disco" - "github.com/apache/servicecomb-service-center/version" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/foundation/gopool" - "github.com/little-cui/etcdadpt" ) -type SCManager struct { -} - -func (sm *SCManager) SelfRegister(ctx context.Context) error { - err := sm.selfRegister(ctx) +func SelfRegister(ctx context.Context) error { + err := selfRegister(ctx) if err != nil { return err } // start send heart beat job - sm.autoSelfHeartBeat() + autoSelfHeartBeat() return nil } -func (sm *SCManager) selfRegister(pCtx context.Context) error { +func selfRegister(pCtx context.Context) error { ctx := core.AddDefaultContextValue(pCtx) - err := sm.registerService(ctx) + err := registerService(ctx) if err != nil { return err } // 实例信息 - return sm.registerInstance(ctx) + return registerInstance(ctx) } -func (sm *SCManager) registerService(ctx context.Context) error { +func registerService(ctx context.Context) error { respE, err := core.ServiceAPI.Exist(ctx, core.GetExistenceRequest()) if err != nil { log.Error("query service center existence failed", err) @@ -92,7 +82,7 @@ func (sm *SCManager) registerService(ctx context.Context) error { return nil } -func (sm *SCManager) registerInstance(ctx context.Context) error { +func registerInstance(ctx context.Context) error { core.Instance.InstanceId = "" core.Instance.ServiceId = core.Service.ServiceId respI, err := discosvc.RegisterInstance(ctx, core.RegisterInstanceRequest()) @@ -111,7 +101,7 @@ func (sm *SCManager) registerInstance(ctx context.Context) error { return nil } -func (sm *SCManager) selfHeartBeat(pCtx context.Context) error { +func selfHeartBeat(pCtx context.Context) error { ctx := core.AddDefaultContextValue(pCtx) respI, err := discosvc.Heartbeat(ctx, core.HeartbeatRequest()) if err != nil { @@ -129,19 +119,19 @@ func (sm *SCManager) selfHeartBeat(pCtx context.Context) error { return err } -func (sm *SCManager) autoSelfHeartBeat() { +func autoSelfHeartBeat() { gopool.Go(func(ctx context.Context) { for { select { case <-ctx.Done(): return case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second): - err := sm.selfHeartBeat(ctx) + err := selfHeartBeat(ctx) if err == nil { continue } //服务不存在,创建服务 - err = sm.selfRegister(ctx) + err = selfRegister(ctx) if err != nil { log.Error(fmt.Sprintf("retry to register[%s/%s/%s/%s] failed", core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version), err) @@ -151,7 +141,7 @@ func (sm *SCManager) autoSelfHeartBeat() { }) } -func (sm *SCManager) SelfUnregister(pCtx context.Context) error { +func SelfUnregister(pCtx context.Context) error { if len(core.Instance.InstanceId) == 0 { return nil } @@ -171,35 +161,3 @@ func (sm *SCManager) SelfUnregister(pCtx context.Context) error { core.Service.ServiceId, core.Instance.InstanceId)) return nil } - -func (sm *SCManager) GetClusters(ctx context.Context) (etcdadpt.Clusters, error) { - return etcdadpt.ListCluster(ctx) -} -func (sm *SCManager) UpgradeServerVersion(ctx context.Context) error { - bytes, err := json.Marshal(config.Server) - if err != nil { - return err - } - return etcdadpt.PutBytes(ctx, path.GetServerInfoKey(), bytes) -} -func (sm *SCManager) UpgradeVersion(ctx context.Context) error { - lock, err := mux.Lock(mux.GlobalLock) - - if err != nil { - log.Error("wait for server ready failed", err) - return err - } - if needUpgrade(ctx) { - config.Server.Version = version.Ver().Version - - if err := sm.UpgradeServerVersion(ctx); err != nil { - log.Error("upgrade server version failed", err) - os.Exit(1) - } - } - err = lock.Unlock() - if err != nil { - log.Error("", err) - } - return err -}