This is an automated email from the ASF dual-hosted git repository. tianxiaoliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new a1387cc Feature: add schema retire cron job (#1187) a1387cc is described below commit a1387cc7a53d010f69437e0cc864761f6681406f Author: little-cui <sure_0...@qq.com> AuthorDate: Tue Dec 28 19:40:25 2021 +0800 Feature: add schema retire cron job (#1187) --- datasource/etcd/path/key_convertor.go | 89 +++++++++------------- datasource/etcd/path/key_convertor_test.go | 8 +- datasource/etcd/path/key_generator.go | 5 -- datasource/etcd/retire.go | 88 +-------------------- datasource/etcd/schema.go | 54 ++++++++++++- datasource/mongo/retire.go | 2 +- datasource/mongo/schema.go | 8 +- datasource/retire.go | 11 +-- datasource/schema/schema.go | 4 +- etc/conf/app.yaml | 4 +- go.mod | 1 + go.sum | 2 + integration/instances_test.go | 24 +++--- server/job/disco/retire.go | 44 +++++------ .../disco/retire.go => job/disco/schema.go} | 28 ++++++- server/service/disco/retire.go | 15 ++++ server/service/disco/retire_test.go | 55 ++++++++++++- server/service/disco/schema_test.go | 28 +++++-- 18 files changed, 255 insertions(+), 215 deletions(-) diff --git a/datasource/etcd/path/key_convertor.go b/datasource/etcd/path/key_convertor.go index 7c93604..7e0ee8c 100644 --- a/datasource/etcd/path/key_convertor.go +++ b/datasource/etcd/path/key_convertor.go @@ -23,39 +23,45 @@ import ( "github.com/apache/servicecomb-service-center/pkg/util" "github.com/go-chassis/cari/discovery" + "github.com/go-chassis/foundation/stringutil" ) -func ToResponse(key []byte) (keys []string) { +func splitKey(key []byte) (keys []string) { return strings.Split(util.BytesToStringWithNoCopy(key), SPLIT) } -func GetInfoFromSvcKV(key []byte) (serviceID, domainProject string) { - keys := ToResponse(key) +func getLast2Keys(key []byte) (string, string) { + keys := splitKey(key) l := len(keys) - if l < 4 { - return + if l < 3 { + return "", "" } - serviceID = keys[l-1] - domainProject = fmt.Sprintf("%s/%s", keys[l-3], keys[l-2]) - return + return fmt.Sprintf("%s/%s", keys[l-3], keys[l-2]), keys[l-1] } -func GetInfoFromInstKV(key []byte) (serviceID, instanceID, domainProject string) { - keys := ToResponse(key) +func getLast3Keys(key []byte) (string, string, string) { + keys := splitKey(key) l := len(keys) if l < 4 { - return + return "", "", "" } - serviceID = keys[l-2] - instanceID = keys[l-1] - domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3]) + return fmt.Sprintf("%s/%s", keys[l-4], keys[l-3]), keys[l-2], keys[l-1] +} + +func GetInfoFromSvcKV(key []byte) (serviceID, domainProject string) { + domainProject, serviceID = getLast2Keys(key) + return +} + +func GetInfoFromInstKV(key []byte) (serviceID, instanceID, domainProject string) { + domainProject, serviceID, instanceID = getLast3Keys(key) return } func GetInfoFromDomainKV(key []byte) (domain string) { - keys := ToResponse(key) + keys := splitKey(key) l := len(keys) - if l < 2 { + if l < 1 { return } domain = keys[l-1] @@ -63,27 +69,21 @@ func GetInfoFromDomainKV(key []byte) (domain string) { } func GetInfoFromProjectKV(key []byte) (domain, project string) { - keys := ToResponse(key) + keys := splitKey(key) l := len(keys) if l < 2 { - return + return "", "" } return keys[l-2], keys[l-1] } func GetInfoFromTagKV(key []byte) (serviceID, domainProject string) { - keys := ToResponse(key) - l := len(keys) - if l < 3 { - return - } - serviceID = keys[l-1] - domainProject = fmt.Sprintf("%s/%s", keys[l-3], keys[l-2]) + domainProject, serviceID = getLast2Keys(key) return } func GetInfoFromSvcIndexKV(key []byte) *discovery.MicroServiceKey { - keys := ToResponse(key) + keys := splitKey(key) l := len(keys) if l < 6 { return nil @@ -103,43 +103,28 @@ func GetInfoFromSvcAliasKV(key []byte) *discovery.MicroServiceKey { } func GetInfoFromSchemaRefKV(key []byte) (domainProject, serviceID, schemaID string) { - return GetInfoFromSchemaSummaryKV(key) + return getLast3Keys(key) } func GetInfoFromSchemaSummaryKV(key []byte) (domainProject, serviceID, schemaID string) { - keys := ToResponse(key) - l := len(keys) - if l < 4 { - return - } - domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3]) - return domainProject, keys[l-2], keys[l-1] + return getLast3Keys(key) } func GetInfoFromSchemaKV(key []byte) (domainProject, serviceID, schemaID string) { - keys := ToResponse(key) - l := len(keys) - if l < 4 { - return - } - domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3]) - return domainProject, keys[l-2], keys[l-1] + return getLast3Keys(key) +} + +func GetInfoFromSchemaContentKV(key []byte) (domainProject, hash string) { + return getLast2Keys(key) } func GetInfoFromDependencyQueueKV(key []byte) (consumerID, domainProject, uuid string) { - keys := ToResponse(key) - l := len(keys) - if l < 4 { - return - } - consumerID = keys[l-2] - domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3]) - uuid = keys[l-1] + domainProject, consumerID, uuid = getLast3Keys(key) return } func GetInfoFromDependencyRuleKV(key []byte) (t string, _ *discovery.MicroServiceKey) { - keys := ToResponse(key) + keys := splitKey(key) l := len(keys) if l < 5 { return "", nil @@ -162,7 +147,5 @@ func GetInfoFromDependencyRuleKV(key []byte) (t string, _ *discovery.MicroServic } func SplitDomainProject(domainProject string) (string, string) { - domain := domainProject[:strings.Index(domainProject, SPLIT)] - project := domainProject[strings.Index(domainProject, SPLIT)+1:] - return domain, project + return stringutil.SplitToTwo(domainProject, SPLIT) } diff --git a/datasource/etcd/path/key_convertor_test.go b/datasource/etcd/path/key_convertor_test.go index add76d3..dc09366 100644 --- a/datasource/etcd/path/key_convertor_test.go +++ b/datasource/etcd/path/key_convertor_test.go @@ -43,7 +43,7 @@ func TestGetInfoFromKV(t *testing.T) { assert.False(t, d != "a") d = path.GetInfoFromDomainKV([]byte("sdf")) - assert.False(t, d != "") + assert.False(t, d != "sdf") p := "" d, p = path.GetInfoFromProjectKV([]byte(path.GenerateProjectKey("a", "b"))) @@ -88,6 +88,12 @@ func TestGetInfoFromKV(t *testing.T) { d, s, m = path.GetInfoFromSchemaKV([]byte("sdf")) assert.False(t, m != "" || s != "" || d != "") + d, h := path.GetInfoFromSchemaContentKV([]byte(path.GenerateServiceSchemaContentKey("a/b", "c"))) + assert.False(t, h != "c" || d != "a/b") + + d, h = path.GetInfoFromSchemaContentKV([]byte("sdf")) + assert.False(t, h != "" || d != "") + u := "" s, d, u = path.GetInfoFromDependencyQueueKV([]byte(path.GenerateConsumerDependencyQueueKey("a/b", "c", "d"))) assert.False(t, s != "c" || d != "a/b" || u != "d") diff --git a/datasource/etcd/path/key_generator.go b/datasource/etcd/path/key_generator.go index 31d4a7f..59137d6 100644 --- a/datasource/etcd/path/key_generator.go +++ b/datasource/etcd/path/key_generator.go @@ -46,7 +46,6 @@ const ( DepsQueueUUID = "0" DepsConsumer = "c" DepsProvider = "p" - RegistryRetirePlan = "retire-plan" ) func GetRootKey() string { @@ -384,7 +383,3 @@ func GenerateMetricsKey(name, utc, domain string) string { domain, }, SPLIT) } - -func GenerateRetirePlanKey() string { - return util.StringJoin([]string{GetRootKey(), RegistryRetirePlan}, SPLIT) -} diff --git a/datasource/etcd/retire.go b/datasource/etcd/retire.go index 3087be6..54d4d67 100644 --- a/datasource/etcd/retire.go +++ b/datasource/etcd/retire.go @@ -19,19 +19,14 @@ package etcd import ( "context" - "encoding/json" - "errors" "fmt" "sync/atomic" - "time" "github.com/apache/servicecomb-service-center/datasource" - "github.com/apache/servicecomb-service-center/datasource/etcd/mux" "github.com/apache/servicecomb-service-center/datasource/etcd/path" "github.com/apache/servicecomb-service-center/datasource/etcd/sd" "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore" serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util" - "github.com/apache/servicecomb-service-center/pkg/etcdsync" "github.com/apache/servicecomb-service-center/pkg/goutil" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" @@ -40,30 +35,14 @@ import ( "github.com/little-cui/etcdadpt" ) -const ( - poolSizeOfRotation = 5 - retirementLockID mux.ID = "/cse-sr/lock/retirement" -) - -var ErrAlreadyRetire = errors.New("already retired by other SC") +const poolSizeOfRotation = 5 type RotateServiceIDKey struct { DomainProject string ServiceID string } -func (ds *MetadataManager) RetireService(ctx context.Context, localPlan *datasource.RetirePlan) error { - lock, err := getRetirementLock() - if err != nil { - return err - } - defer releaseRetirementLock(lock) - - plan, err := ds.getRetirePlan(ctx, localPlan) - if err != nil || !plan.ShouldRetire() { - return err - } - +func (ds *MetadataManager) RetireService(ctx context.Context, plan *datasource.RetirePlan) error { key := path.GetServiceIndexRootKey("") indexesResp, err := sd.ServiceIndex().Search(ctx, etcdadpt.WithStrKey(key), etcdadpt.WithPrefix()) if err != nil { @@ -84,41 +63,7 @@ func (ds *MetadataManager) RetireService(ctx context.Context, localPlan *datasou if n > 0 { log.Warn(fmt.Sprintf("%d microservices retired", n)) } - - plan.LastRunAt = time.Now().Unix() - return ds.UpsertRetirePlan(ctx, plan) -} - -func releaseRetirementLock(lock *etcdsync.DLock) { - err := lock.Unlock() - if err != nil { - log.Error("", err) - } -} - -func getRetirementLock() (*etcdsync.DLock, error) { - lock, err := mux.Try(retirementLockID) - if err != nil { - return nil, err - } - if lock == nil { - return nil, ErrAlreadyRetire - } - return lock, nil -} - -func (ds *MetadataManager) getRetirePlan(ctx context.Context, localPlan *datasource.RetirePlan) (*datasource.RetirePlan, error) { - plan, err := ds.GetRetirePlan(ctx) - if err != nil { - return nil, err - } - if plan == nil { - plan = localPlan - } else { - plan.Interval = localPlan.Interval - plan.Reserve = localPlan.Reserve - } - return plan, nil + return nil } func FilterNoInstance(ctx context.Context, serviceIDKeys []*RotateServiceIDKey) []*RotateServiceIDKey { @@ -198,30 +143,3 @@ func GetRetireServiceIDs(indexesResp *kvstore.Response, reserveVersionCount int) } return serviceIDs } - -func (ds *MetadataManager) GetRetirePlan(ctx context.Context) (*datasource.RetirePlan, error) { - kv, err := etcdadpt.Get(ctx, path.GenerateRetirePlanKey()) - if err != nil { - log.Error("", err) - return nil, err - } - if kv == nil { - return nil, nil - } - var plan datasource.RetirePlan - err = json.Unmarshal(kv.Value, &plan) - if err != nil { - log.Error("decode retire plan failed", err) - return nil, err - } - return &plan, nil -} - -func (ds *MetadataManager) UpsertRetirePlan(ctx context.Context, plan *datasource.RetirePlan) error { - bytes, err := json.Marshal(plan) - if err != nil { - log.Error("encode retire plan failed", err) - return err - } - return etcdadpt.PutBytes(ctx, path.GenerateRetirePlanKey(), bytes) -} diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go index afb1057..892cb8b 100644 --- a/datasource/etcd/schema.go +++ b/datasource/etcd/schema.go @@ -32,6 +32,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/go-chassis/cari/discovery" "github.com/little-cui/etcdadpt" + "go.etcd.io/etcd/api/v3/mvccpb" ) func init() { @@ -352,10 +353,55 @@ func getContentHashMap(ctx context.Context) (map[string]struct{}, error) { return refMap, nil } -func (dao *SchemaDAO) ListHash(ctx context.Context) ([]*schema.Content, error) { - panic("implement me") +func (dao *SchemaDAO) DeleteNoRefContents(ctx context.Context) (int, error) { + contentPrefixKey := path.GetServiceSchemaContentRootKey("") + kvs, _, err := etcdadpt.List(ctx, contentPrefixKey, etcdadpt.WithKeyOnly()) + if err != nil { + log.Error("list contents failed", err) + return 0, err + } + if len(kvs) == 0 { + return 0, nil + } + + set, err := filterNoRefContentHashes(ctx, kvs) + if err != nil { + log.Error("filter no ref content hashes failed", err) + return 0, err + } + if set.Cardinality() == 0 { + return 0, nil + } + + var ops []etcdadpt.OpOptions + for item := range set.Iter() { + ops = append(ops, etcdadpt.OpDel(etcdadpt.WithStrKey(contentPrefixKey+item.(string)))) + } + err = etcdadpt.Txn(ctx, ops) + if err != nil { + log.Error("txn delete failed", err) + return 0, err + } + return len(ops), nil } -func (dao *SchemaDAO) ExistRef(ctx context.Context, hash *schema.ContentRequest) (*schema.Ref, error) { - panic("implement me") +func filterNoRefContentHashes(ctx context.Context, kvs []*mvccpb.KeyValue) (mapset.Set, error) { + set := mapset.NewThreadUnsafeSet() + for _, kv := range kvs { + domainProject, hash := path.GetInfoFromSchemaContentKV(kv.Key) + set.Add(domainProject + path.SPLIT + hash) + } + + refPrefixKey := path.GetServiceSchemaRefRootKey("") + resp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx, + etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...) + if err != nil { + return nil, err + } + + for _, kv := range resp.Kvs { + domainProject, _, _ := path.GetInfoFromSchemaRefKV(kv.Key) + set.Remove(domainProject + path.SPLIT + kv.Value.(string)) + } + return set, nil } diff --git a/datasource/mongo/retire.go b/datasource/mongo/retire.go index 0ffc9a9..88e76e4 100644 --- a/datasource/mongo/retire.go +++ b/datasource/mongo/retire.go @@ -24,5 +24,5 @@ import ( ) func (ds *MetadataManager) RetireService(ctx context.Context, plan *datasource.RetirePlan) error { - panic("implement me") + return nil } diff --git a/datasource/mongo/schema.go b/datasource/mongo/schema.go index 6d45f96..85f16e3 100644 --- a/datasource/mongo/schema.go +++ b/datasource/mongo/schema.go @@ -88,10 +88,6 @@ func (s *SchemaDAO) DeleteContent(ctx context.Context, contentRequest *schema.Co return schema.ErrSchemaContentNotFound } -func (s *SchemaDAO) ListHash(ctx context.Context) ([]*schema.Content, error) { - panic("implement me") -} - -func (s *SchemaDAO) ExistRef(ctx context.Context, contentRequest *schema.ContentRequest) (*schema.Ref, error) { - panic("implement me") +func (s *SchemaDAO) DeleteNoRefContents(ctx context.Context) (int, error) { + return 0, nil } diff --git a/datasource/retire.go b/datasource/retire.go index 47f0d2d..5585680 100644 --- a/datasource/retire.go +++ b/datasource/retire.go @@ -17,14 +17,7 @@ package datasource -import "time" - type RetirePlan struct { - Interval time.Duration `json:"interval,omitempty"` - Reserve int `json:"reserve,omitempty"` - LastRunAt int64 `json:"lastRunAt,omitempty" bson:"last_run_at"` -} - -func (r *RetirePlan) ShouldRetire() bool { - return time.Now().Add(-r.Interval).Unix() >= r.LastRunAt + Cron string `json:"cron,omitempty"` + Reserve int `json:"reserve,omitempty"` } diff --git a/datasource/schema/schema.go b/datasource/schema/schema.go index 6e4e463..b005569 100644 --- a/datasource/schema/schema.go +++ b/datasource/schema/schema.go @@ -82,9 +82,7 @@ type DAO interface { PutContent(ctx context.Context, contentRequest *PutContentRequest) error PutManyContent(ctx context.Context, contentRequest *PutManyContentRequest) error DeleteContent(ctx context.Context, contentRequest *ContentRequest) error - // ListHash return Content list without content - ListHash(ctx context.Context) ([]*Content, error) - ExistRef(ctx context.Context, contentRequest *ContentRequest) (*Ref, error) + DeleteNoRefContents(ctx context.Context) (int, error) } func Hash(schemaID, content string) string { diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml index 8d190e9..2848946 100644 --- a/etc/conf/app.yaml +++ b/etc/conf/app.yaml @@ -151,7 +151,7 @@ registry: # delete other versions which doesn't register any instances. retire: disable: false - interval: 12h + cron: '0 1 * * *' reserve: 3 instance: # By default, instance TTL = (times + 1) * interval @@ -168,7 +168,7 @@ registry: notEditable: false # remove the schema without refs every 7d retire: - interval: 7d + cron: '0 2 * * *' # enable to register sc itself when startup selfRegister: 1 diff --git a/go.mod b/go.mod index 5055d94..63b01a4 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ replace ( ) require ( + github.com/robfig/cron/v3 v3.0.1 github.com/NYTimes/gziphandler v1.1.1 github.com/apache/servicecomb-service-center/api v0.0.0 github.com/astaxie/beego v1.12.2 diff --git a/go.sum b/go.sum index 8a38c15..7f895cf 100644 --- a/go.sum +++ b/go.sum @@ -563,6 +563,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/integration/instances_test.go b/integration/instances_test.go index f5ac910..457931b 100644 --- a/integration/instances_test.go +++ b/integration/instances_test.go @@ -17,28 +17,26 @@ package integrationtest_test import ( + "bytes" "encoding/json" "fmt" + "io/ioutil" + "math/rand" "net/http" + "strconv" "strings" "sync" - - "github.com/go-chassis/cari/discovery" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/assert" + "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/widuu/gojson" - - "bytes" - "io/ioutil" - "math/rand" - "strconv" - "testing" - "time" . "github.com/apache/servicecomb-service-center/integration" + "github.com/go-chassis/cari/discovery" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "github.com/widuu/gojson" ) var _ = Describe("MicroService Api Test", func() { @@ -395,7 +393,7 @@ var _ = Describe("MicroService Api Test", func() { }) It("Find Micro-service Info by alias", func() { - req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+alias+"&version="+serviceVersion, nil) + req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?noCache=true&appId="+serviceAppId+"&serviceName="+alias+"&version="+serviceVersion, nil) req.Header.Set("X-Domain-Name", "default") req.Header.Set("X-ConsumerId", consumerID) resp, _ := scclient.Do(req) diff --git a/server/job/disco/retire.go b/server/job/disco/retire.go index 55e4fdf..fcc8f6e 100644 --- a/server/job/disco/retire.go +++ b/server/job/disco/retire.go @@ -20,49 +20,45 @@ package disco import ( "context" "fmt" - "time" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/server/config" discosvc "github.com/apache/servicecomb-service-center/server/service/disco" - "github.com/go-chassis/foundation/gopool" + "github.com/robfig/cron/v3" ) const ( - defaultRetireMicroserviceInterval = 12 * time.Hour - defaultReserveVersionCount = 3 + defaultRetireMicroserviceCron = "0 1 * * *" + defaultReserveVersionCount = 3 ) func init() { - startRetireServiceJob() -} - -func startRetireServiceJob() { disable := config.GetBool("registry.service.retire.disable", false) if disable { return } + startRetireServiceJob() +} +func startRetireServiceJob() { localPlan := &datasource.RetirePlan{ - Interval: config.GetDuration("registry.service.retire.interval", defaultRetireMicroserviceInterval), - Reserve: config.GetInt("registry.service.retire.reserve", defaultReserveVersionCount), + Cron: config.GetString("registry.service.retire.cron", defaultRetireMicroserviceCron), + Reserve: config.GetInt("registry.service.retire.reserve", defaultReserveVersionCount), } - log.Info(fmt.Sprintf("start retire microservice job, plan is %v", localPlan)) - gopool.Go(func(ctx context.Context) { - tick := time.NewTicker(localPlan.Interval) - defer tick.Stop() - for { - select { - case <-ctx.Done(): - return - case <-tick.C: - err := discosvc.RetireService(ctx, localPlan) - if err != nil { - log.Error("retire microservice failed", err) - } - } + + c := cron.New() + _, err := c.AddFunc(localPlan.Cron, func() { + //TODO use DLock + err := discosvc.RetireService(context.Background(), localPlan) + if err != nil { + log.Error("retire microservice failed", err) } }) + if err != nil { + log.Error("cron add func failed", err) + return + } + c.Start() } diff --git a/server/service/disco/retire.go b/server/job/disco/schema.go similarity index 53% copy from server/service/disco/retire.go copy to server/job/disco/schema.go index 9b64b26..9ba782f 100644 --- a/server/service/disco/retire.go +++ b/server/job/disco/schema.go @@ -19,10 +19,32 @@ package disco import ( "context" + "fmt" - "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/config" + discosvc "github.com/apache/servicecomb-service-center/server/service/disco" + "github.com/robfig/cron/v3" ) -func RetireService(ctx context.Context, plan *datasource.RetirePlan) error { - return datasource.GetMetadataManager().RetireService(ctx, plan) +const ( + defaultRetireSchemaCron = "0 2 * * *" +) + +func init() { + cronExpr := config.GetString("registry.schema.retire.cron", defaultRetireSchemaCron) + log.Info(fmt.Sprintf("start retire schema job, plan is %v", cronExpr)) + c := cron.New() + _, err := c.AddFunc(cronExpr, func() { + //TODO use DLock + err := discosvc.RetireSchema(context.Background()) + if err != nil { + log.Error("retire schema failed", err) + } + }) + if err != nil { + log.Error("cron add func failed", err) + return + } + c.Start() } diff --git a/server/service/disco/retire.go b/server/service/disco/retire.go index 9b64b26..0d17d15 100644 --- a/server/service/disco/retire.go +++ b/server/service/disco/retire.go @@ -19,10 +19,25 @@ package disco import ( "context" + "fmt" "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/datasource/schema" + "github.com/apache/servicecomb-service-center/pkg/log" ) func RetireService(ctx context.Context, plan *datasource.RetirePlan) error { return datasource.GetMetadataManager().RetireService(ctx, plan) } + +func RetireSchema(ctx context.Context) error { + n, err := schema.Instance().DeleteNoRefContents(ctx) + if err != nil { + log.Error("delete no ref contents failed", err) + return err + } + if n > 0 { + log.Warn(fmt.Sprintf("%d schema-contents retired", n)) + } + return nil +} diff --git a/server/service/disco/retire_test.go b/server/service/disco/retire_test.go index ca778c9..c41fbc6 100644 --- a/server/service/disco/retire_test.go +++ b/server/service/disco/retire_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/datasource/schema" discosvc "github.com/apache/servicecomb-service-center/server/service/disco" "github.com/apache/servicecomb-service-center/test" pb "github.com/go-chassis/cari/discovery" @@ -54,11 +55,12 @@ func TestRetireService(t *testing.T) { idx := fmt.Sprintf("%d", i) discosvc.UnregisterService(ctx, &pb.DeleteServiceRequest{ ServiceId: serviceIDPrefix + idx, + Force: true, }) } }() - err := discosvc.RetireService(ctx, &datasource.RetirePlan{Interval: 0, Reserve: 1}) + err := discosvc.RetireService(ctx, &datasource.RetirePlan{Reserve: 1}) assert.NoError(t, err) resp, err := datasource.GetMetadataManager().ListServiceDetail(ctx, &pb.GetServicesInfoRequest{ @@ -69,3 +71,54 @@ func TestRetireService(t *testing.T) { assert.Equal(t, serviceIDPrefix+"4", resp.AllServicesDetail[0].MicroService.ServiceId) }) } + +func TestRetireSchema(t *testing.T) { + if !test.IsETCD() { + return + } + + var serviceID string + + ctx := getContext() + service, err := discosvc.RegisterService(ctx, &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceName: "TestRetireSchema", + }, + }) + assert.NoError(t, err) + serviceID = service.ServiceId + schemaID := "schemaID_1" + content := "content_1" + hash := schema.Hash(schemaID, content) + defer schema.Instance().DeleteContent(ctx, &schema.ContentRequest{Hash: hash}) + defer discosvc.UnregisterService(ctx, &pb.DeleteServiceRequest{ServiceId: serviceID, Force: true}) + + t.Run("retire schema with ref, should not delete it", func(t *testing.T) { + err = discosvc.PutSchema(ctx, &pb.ModifySchemaRequest{ + ServiceId: serviceID, + SchemaId: schemaID, + Schema: content, + }) + assert.NoError(t, err) + + err := discosvc.RetireSchema(ctx) + assert.NoError(t, err) + + _, err = discosvc.GetSchema(ctx, &pb.GetSchemaRequest{ServiceId: serviceID, SchemaId: schemaID}) + assert.NoError(t, err) + }) + + t.Run("retire schema without ref, should delete it", func(t *testing.T) { + err := discosvc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{ + ServiceId: serviceID, + SchemaId: schemaID, + }) + assert.NoError(t, err) + + err = discosvc.RetireSchema(ctx) + assert.NoError(t, err) + + _, err = schema.Instance().GetContent(ctx, &schema.ContentRequest{Hash: hash}) + assert.ErrorIs(t, schema.ErrSchemaContentNotFound, err) + }) +} diff --git a/server/service/disco/schema_test.go b/server/service/disco/schema_test.go index 6930dc4..a77764c 100644 --- a/server/service/disco/schema_test.go +++ b/server/service/disco/schema_test.go @@ -1014,7 +1014,8 @@ func TestCompatibleOperateSchema(t *testing.T) { }) assert.NoError(t, err) assert.Equal(t, 2, len(schemas)) - schema = schemas[1] + schema = findSchemaBySchemaID(schemas, "schemaID_2") + assert.NotNil(t, schema) assert.Equal(t, "schema_2", schema.Schema) assert.Equal(t, "summary2", schema.Summary) }) @@ -1040,10 +1041,14 @@ func TestCompatibleOperateSchema(t *testing.T) { }) assert.NoError(t, err) assert.Equal(t, 2, len(schemas)) - schema = schemas[0] + + schema = findSchemaBySchemaID(schemas, "schemaID_1") + assert.NotNil(t, schema) assert.Empty(t, schema.Schema) assert.Empty(t, schema.Summary) - schema = schemas[1] + + schema = findSchemaBySchemaID(schemas, "schemaID_2") + assert.NotNil(t, schema) assert.Equal(t, "schema_2", schema.Schema) assert.Equal(t, "summary2", schema.Summary) }) @@ -1069,11 +1074,24 @@ func TestCompatibleOperateSchema(t *testing.T) { }) assert.NoError(t, err) assert.Equal(t, 2, len(schemas)) - schema = schemas[0] + + schema = findSchemaBySchemaID(schemas, "schemaID_1") + assert.NotNil(t, schema) assert.Empty(t, schema.Schema) assert.Empty(t, schema.Summary) - schema = schemas[1] + + schema = findSchemaBySchemaID(schemas, "schemaID_2") + assert.NotNil(t, schema) assert.Empty(t, schema.Schema) assert.Empty(t, schema.Summary) }) } + +func findSchemaBySchemaID(schemas []*pb.Schema, schemaID string) *pb.Schema { + for _, schema := range schemas { + if schema.SchemaId == schemaID { + return schema + } + } + return nil +}