This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new a1387cc  Feature: add schema retire cron job (#1187)
a1387cc is described below

commit a1387cc7a53d010f69437e0cc864761f6681406f
Author: little-cui <sure_0...@qq.com>
AuthorDate: Tue Dec 28 19:40:25 2021 +0800

    Feature: add schema retire cron job (#1187)
---
 datasource/etcd/path/key_convertor.go              | 89 +++++++++-------------
 datasource/etcd/path/key_convertor_test.go         |  8 +-
 datasource/etcd/path/key_generator.go              |  5 --
 datasource/etcd/retire.go                          | 88 +--------------------
 datasource/etcd/schema.go                          | 54 ++++++++++++-
 datasource/mongo/retire.go                         |  2 +-
 datasource/mongo/schema.go                         |  8 +-
 datasource/retire.go                               | 11 +--
 datasource/schema/schema.go                        |  4 +-
 etc/conf/app.yaml                                  |  4 +-
 go.mod                                             |  1 +
 go.sum                                             |  2 +
 integration/instances_test.go                      | 24 +++---
 server/job/disco/retire.go                         | 44 +++++------
 .../disco/retire.go => job/disco/schema.go}        | 28 ++++++-
 server/service/disco/retire.go                     | 15 ++++
 server/service/disco/retire_test.go                | 55 ++++++++++++-
 server/service/disco/schema_test.go                | 28 +++++--
 18 files changed, 255 insertions(+), 215 deletions(-)

diff --git a/datasource/etcd/path/key_convertor.go 
b/datasource/etcd/path/key_convertor.go
index 7c93604..7e0ee8c 100644
--- a/datasource/etcd/path/key_convertor.go
+++ b/datasource/etcd/path/key_convertor.go
@@ -23,39 +23,45 @@ import (
 
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/go-chassis/cari/discovery"
+       "github.com/go-chassis/foundation/stringutil"
 )
 
-func ToResponse(key []byte) (keys []string) {
+func splitKey(key []byte) (keys []string) {
        return strings.Split(util.BytesToStringWithNoCopy(key), SPLIT)
 }
 
-func GetInfoFromSvcKV(key []byte) (serviceID, domainProject string) {
-       keys := ToResponse(key)
+func getLast2Keys(key []byte) (string, string) {
+       keys := splitKey(key)
        l := len(keys)
-       if l < 4 {
-               return
+       if l < 3 {
+               return "", ""
        }
-       serviceID = keys[l-1]
-       domainProject = fmt.Sprintf("%s/%s", keys[l-3], keys[l-2])
-       return
+       return fmt.Sprintf("%s/%s", keys[l-3], keys[l-2]), keys[l-1]
 }
 
-func GetInfoFromInstKV(key []byte) (serviceID, instanceID, domainProject 
string) {
-       keys := ToResponse(key)
+func getLast3Keys(key []byte) (string, string, string) {
+       keys := splitKey(key)
        l := len(keys)
        if l < 4 {
-               return
+               return "", "", ""
        }
-       serviceID = keys[l-2]
-       instanceID = keys[l-1]
-       domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
+       return fmt.Sprintf("%s/%s", keys[l-4], keys[l-3]), keys[l-2], keys[l-1]
+}
+
+func GetInfoFromSvcKV(key []byte) (serviceID, domainProject string) {
+       domainProject, serviceID = getLast2Keys(key)
+       return
+}
+
+func GetInfoFromInstKV(key []byte) (serviceID, instanceID, domainProject 
string) {
+       domainProject, serviceID, instanceID = getLast3Keys(key)
        return
 }
 
 func GetInfoFromDomainKV(key []byte) (domain string) {
-       keys := ToResponse(key)
+       keys := splitKey(key)
        l := len(keys)
-       if l < 2 {
+       if l < 1 {
                return
        }
        domain = keys[l-1]
@@ -63,27 +69,21 @@ func GetInfoFromDomainKV(key []byte) (domain string) {
 }
 
 func GetInfoFromProjectKV(key []byte) (domain, project string) {
-       keys := ToResponse(key)
+       keys := splitKey(key)
        l := len(keys)
        if l < 2 {
-               return
+               return "", ""
        }
        return keys[l-2], keys[l-1]
 }
 
 func GetInfoFromTagKV(key []byte) (serviceID, domainProject string) {
-       keys := ToResponse(key)
-       l := len(keys)
-       if l < 3 {
-               return
-       }
-       serviceID = keys[l-1]
-       domainProject = fmt.Sprintf("%s/%s", keys[l-3], keys[l-2])
+       domainProject, serviceID = getLast2Keys(key)
        return
 }
 
 func GetInfoFromSvcIndexKV(key []byte) *discovery.MicroServiceKey {
-       keys := ToResponse(key)
+       keys := splitKey(key)
        l := len(keys)
        if l < 6 {
                return nil
@@ -103,43 +103,28 @@ func GetInfoFromSvcAliasKV(key []byte) 
*discovery.MicroServiceKey {
 }
 
 func GetInfoFromSchemaRefKV(key []byte) (domainProject, serviceID, schemaID 
string) {
-       return GetInfoFromSchemaSummaryKV(key)
+       return getLast3Keys(key)
 }
 
 func GetInfoFromSchemaSummaryKV(key []byte) (domainProject, serviceID, 
schemaID string) {
-       keys := ToResponse(key)
-       l := len(keys)
-       if l < 4 {
-               return
-       }
-       domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
-       return domainProject, keys[l-2], keys[l-1]
+       return getLast3Keys(key)
 }
 
 func GetInfoFromSchemaKV(key []byte) (domainProject, serviceID, schemaID 
string) {
-       keys := ToResponse(key)
-       l := len(keys)
-       if l < 4 {
-               return
-       }
-       domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
-       return domainProject, keys[l-2], keys[l-1]
+       return getLast3Keys(key)
+}
+
+func GetInfoFromSchemaContentKV(key []byte) (domainProject, hash string) {
+       return getLast2Keys(key)
 }
 
 func GetInfoFromDependencyQueueKV(key []byte) (consumerID, domainProject, uuid 
string) {
-       keys := ToResponse(key)
-       l := len(keys)
-       if l < 4 {
-               return
-       }
-       consumerID = keys[l-2]
-       domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
-       uuid = keys[l-1]
+       domainProject, consumerID, uuid = getLast3Keys(key)
        return
 }
 
 func GetInfoFromDependencyRuleKV(key []byte) (t string, _ 
*discovery.MicroServiceKey) {
-       keys := ToResponse(key)
+       keys := splitKey(key)
        l := len(keys)
        if l < 5 {
                return "", nil
@@ -162,7 +147,5 @@ func GetInfoFromDependencyRuleKV(key []byte) (t string, _ 
*discovery.MicroServic
 }
 
 func SplitDomainProject(domainProject string) (string, string) {
-       domain := domainProject[:strings.Index(domainProject, SPLIT)]
-       project := domainProject[strings.Index(domainProject, SPLIT)+1:]
-       return domain, project
+       return stringutil.SplitToTwo(domainProject, SPLIT)
 }
diff --git a/datasource/etcd/path/key_convertor_test.go 
b/datasource/etcd/path/key_convertor_test.go
index add76d3..dc09366 100644
--- a/datasource/etcd/path/key_convertor_test.go
+++ b/datasource/etcd/path/key_convertor_test.go
@@ -43,7 +43,7 @@ func TestGetInfoFromKV(t *testing.T) {
        assert.False(t, d != "a")
 
        d = path.GetInfoFromDomainKV([]byte("sdf"))
-       assert.False(t, d != "")
+       assert.False(t, d != "sdf")
 
        p := ""
        d, p = path.GetInfoFromProjectKV([]byte(path.GenerateProjectKey("a", 
"b")))
@@ -88,6 +88,12 @@ func TestGetInfoFromKV(t *testing.T) {
        d, s, m = path.GetInfoFromSchemaKV([]byte("sdf"))
        assert.False(t, m != "" || s != "" || d != "")
 
+       d, h := 
path.GetInfoFromSchemaContentKV([]byte(path.GenerateServiceSchemaContentKey("a/b",
 "c")))
+       assert.False(t, h != "c" || d != "a/b")
+
+       d, h = path.GetInfoFromSchemaContentKV([]byte("sdf"))
+       assert.False(t, h != "" || d != "")
+
        u := ""
        s, d, u = 
path.GetInfoFromDependencyQueueKV([]byte(path.GenerateConsumerDependencyQueueKey("a/b",
 "c", "d")))
        assert.False(t, s != "c" || d != "a/b" || u != "d")
diff --git a/datasource/etcd/path/key_generator.go 
b/datasource/etcd/path/key_generator.go
index 31d4a7f..59137d6 100644
--- a/datasource/etcd/path/key_generator.go
+++ b/datasource/etcd/path/key_generator.go
@@ -46,7 +46,6 @@ const (
        DepsQueueUUID            = "0"
        DepsConsumer             = "c"
        DepsProvider             = "p"
-       RegistryRetirePlan       = "retire-plan"
 )
 
 func GetRootKey() string {
@@ -384,7 +383,3 @@ func GenerateMetricsKey(name, utc, domain string) string {
                domain,
        }, SPLIT)
 }
-
-func GenerateRetirePlanKey() string {
-       return util.StringJoin([]string{GetRootKey(), RegistryRetirePlan}, 
SPLIT)
-}
diff --git a/datasource/etcd/retire.go b/datasource/etcd/retire.go
index 3087be6..54d4d67 100644
--- a/datasource/etcd/retire.go
+++ b/datasource/etcd/retire.go
@@ -19,19 +19,14 @@ package etcd
 
 import (
        "context"
-       "encoding/json"
-       "errors"
        "fmt"
        "sync/atomic"
-       "time"
 
        "github.com/apache/servicecomb-service-center/datasource"
-       "github.com/apache/servicecomb-service-center/datasource/etcd/mux"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
        "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
        
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
        serviceUtil 
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
-       "github.com/apache/servicecomb-service-center/pkg/etcdsync"
        "github.com/apache/servicecomb-service-center/pkg/goutil"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
@@ -40,30 +35,14 @@ import (
        "github.com/little-cui/etcdadpt"
 )
 
-const (
-       poolSizeOfRotation        = 5
-       retirementLockID   mux.ID = "/cse-sr/lock/retirement"
-)
-
-var ErrAlreadyRetire = errors.New("already retired by other SC")
+const poolSizeOfRotation = 5
 
 type RotateServiceIDKey struct {
        DomainProject string
        ServiceID     string
 }
 
-func (ds *MetadataManager) RetireService(ctx context.Context, localPlan 
*datasource.RetirePlan) error {
-       lock, err := getRetirementLock()
-       if err != nil {
-               return err
-       }
-       defer releaseRetirementLock(lock)
-
-       plan, err := ds.getRetirePlan(ctx, localPlan)
-       if err != nil || !plan.ShouldRetire() {
-               return err
-       }
-
+func (ds *MetadataManager) RetireService(ctx context.Context, plan 
*datasource.RetirePlan) error {
        key := path.GetServiceIndexRootKey("")
        indexesResp, err := sd.ServiceIndex().Search(ctx, 
etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
        if err != nil {
@@ -84,41 +63,7 @@ func (ds *MetadataManager) RetireService(ctx 
context.Context, localPlan *datasou
        if n > 0 {
                log.Warn(fmt.Sprintf("%d microservices retired", n))
        }
-
-       plan.LastRunAt = time.Now().Unix()
-       return ds.UpsertRetirePlan(ctx, plan)
-}
-
-func releaseRetirementLock(lock *etcdsync.DLock) {
-       err := lock.Unlock()
-       if err != nil {
-               log.Error("", err)
-       }
-}
-
-func getRetirementLock() (*etcdsync.DLock, error) {
-       lock, err := mux.Try(retirementLockID)
-       if err != nil {
-               return nil, err
-       }
-       if lock == nil {
-               return nil, ErrAlreadyRetire
-       }
-       return lock, nil
-}
-
-func (ds *MetadataManager) getRetirePlan(ctx context.Context, localPlan 
*datasource.RetirePlan) (*datasource.RetirePlan, error) {
-       plan, err := ds.GetRetirePlan(ctx)
-       if err != nil {
-               return nil, err
-       }
-       if plan == nil {
-               plan = localPlan
-       } else {
-               plan.Interval = localPlan.Interval
-               plan.Reserve = localPlan.Reserve
-       }
-       return plan, nil
+       return nil
 }
 
 func FilterNoInstance(ctx context.Context, serviceIDKeys 
[]*RotateServiceIDKey) []*RotateServiceIDKey {
@@ -198,30 +143,3 @@ func GetRetireServiceIDs(indexesResp *kvstore.Response, 
reserveVersionCount int)
        }
        return serviceIDs
 }
-
-func (ds *MetadataManager) GetRetirePlan(ctx context.Context) 
(*datasource.RetirePlan, error) {
-       kv, err := etcdadpt.Get(ctx, path.GenerateRetirePlanKey())
-       if err != nil {
-               log.Error("", err)
-               return nil, err
-       }
-       if kv == nil {
-               return nil, nil
-       }
-       var plan datasource.RetirePlan
-       err = json.Unmarshal(kv.Value, &plan)
-       if err != nil {
-               log.Error("decode retire plan failed", err)
-               return nil, err
-       }
-       return &plan, nil
-}
-
-func (ds *MetadataManager) UpsertRetirePlan(ctx context.Context, plan 
*datasource.RetirePlan) error {
-       bytes, err := json.Marshal(plan)
-       if err != nil {
-               log.Error("encode retire plan failed", err)
-               return err
-       }
-       return etcdadpt.PutBytes(ctx, path.GenerateRetirePlanKey(), bytes)
-}
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index afb1057..892cb8b 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -32,6 +32,7 @@ import (
        mapset "github.com/deckarep/golang-set"
        "github.com/go-chassis/cari/discovery"
        "github.com/little-cui/etcdadpt"
+       "go.etcd.io/etcd/api/v3/mvccpb"
 )
 
 func init() {
@@ -352,10 +353,55 @@ func getContentHashMap(ctx context.Context) 
(map[string]struct{}, error) {
        return refMap, nil
 }
 
-func (dao *SchemaDAO) ListHash(ctx context.Context) ([]*schema.Content, error) 
{
-       panic("implement me")
+func (dao *SchemaDAO) DeleteNoRefContents(ctx context.Context) (int, error) {
+       contentPrefixKey := path.GetServiceSchemaContentRootKey("")
+       kvs, _, err := etcdadpt.List(ctx, contentPrefixKey, 
etcdadpt.WithKeyOnly())
+       if err != nil {
+               log.Error("list contents failed", err)
+               return 0, err
+       }
+       if len(kvs) == 0 {
+               return 0, nil
+       }
+
+       set, err := filterNoRefContentHashes(ctx, kvs)
+       if err != nil {
+               log.Error("filter no ref content hashes failed", err)
+               return 0, err
+       }
+       if set.Cardinality() == 0 {
+               return 0, nil
+       }
+
+       var ops []etcdadpt.OpOptions
+       for item := range set.Iter() {
+               ops = append(ops, 
etcdadpt.OpDel(etcdadpt.WithStrKey(contentPrefixKey+item.(string))))
+       }
+       err = etcdadpt.Txn(ctx, ops)
+       if err != nil {
+               log.Error("txn delete failed", err)
+               return 0, err
+       }
+       return len(ops), nil
 }
 
-func (dao *SchemaDAO) ExistRef(ctx context.Context, hash 
*schema.ContentRequest) (*schema.Ref, error) {
-       panic("implement me")
+func filterNoRefContentHashes(ctx context.Context, kvs []*mvccpb.KeyValue) 
(mapset.Set, error) {
+       set := mapset.NewThreadUnsafeSet()
+       for _, kv := range kvs {
+               domainProject, hash := path.GetInfoFromSchemaContentKV(kv.Key)
+               set.Add(domainProject + path.SPLIT + hash)
+       }
+
+       refPrefixKey := path.GetServiceSchemaRefRootKey("")
+       resp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+               etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
+       if err != nil {
+               return nil, err
+       }
+
+       for _, kv := range resp.Kvs {
+               domainProject, _, _ := path.GetInfoFromSchemaRefKV(kv.Key)
+               set.Remove(domainProject + path.SPLIT + kv.Value.(string))
+       }
+       return set, nil
 }
diff --git a/datasource/mongo/retire.go b/datasource/mongo/retire.go
index 0ffc9a9..88e76e4 100644
--- a/datasource/mongo/retire.go
+++ b/datasource/mongo/retire.go
@@ -24,5 +24,5 @@ import (
 )
 
 func (ds *MetadataManager) RetireService(ctx context.Context, plan 
*datasource.RetirePlan) error {
-       panic("implement me")
+       return nil
 }
diff --git a/datasource/mongo/schema.go b/datasource/mongo/schema.go
index 6d45f96..85f16e3 100644
--- a/datasource/mongo/schema.go
+++ b/datasource/mongo/schema.go
@@ -88,10 +88,6 @@ func (s *SchemaDAO) DeleteContent(ctx context.Context, 
contentRequest *schema.Co
        return schema.ErrSchemaContentNotFound
 }
 
-func (s *SchemaDAO) ListHash(ctx context.Context) ([]*schema.Content, error) {
-       panic("implement me")
-}
-
-func (s *SchemaDAO) ExistRef(ctx context.Context, contentRequest 
*schema.ContentRequest) (*schema.Ref, error) {
-       panic("implement me")
+func (s *SchemaDAO) DeleteNoRefContents(ctx context.Context) (int, error) {
+       return 0, nil
 }
diff --git a/datasource/retire.go b/datasource/retire.go
index 47f0d2d..5585680 100644
--- a/datasource/retire.go
+++ b/datasource/retire.go
@@ -17,14 +17,7 @@
 
 package datasource
 
-import "time"
-
 type RetirePlan struct {
-       Interval  time.Duration `json:"interval,omitempty"`
-       Reserve   int           `json:"reserve,omitempty"`
-       LastRunAt int64         `json:"lastRunAt,omitempty" bson:"last_run_at"`
-}
-
-func (r *RetirePlan) ShouldRetire() bool {
-       return time.Now().Add(-r.Interval).Unix() >= r.LastRunAt
+       Cron    string `json:"cron,omitempty"`
+       Reserve int    `json:"reserve,omitempty"`
 }
diff --git a/datasource/schema/schema.go b/datasource/schema/schema.go
index 6e4e463..b005569 100644
--- a/datasource/schema/schema.go
+++ b/datasource/schema/schema.go
@@ -82,9 +82,7 @@ type DAO interface {
        PutContent(ctx context.Context, contentRequest *PutContentRequest) error
        PutManyContent(ctx context.Context, contentRequest 
*PutManyContentRequest) error
        DeleteContent(ctx context.Context, contentRequest *ContentRequest) error
-       // ListHash return Content list without content
-       ListHash(ctx context.Context) ([]*Content, error)
-       ExistRef(ctx context.Context, contentRequest *ContentRequest) (*Ref, 
error)
+       DeleteNoRefContents(ctx context.Context) (int, error)
 }
 
 func Hash(schemaID, content string) string {
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index 8d190e9..2848946 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -151,7 +151,7 @@ registry:
     # delete other versions which doesn't register any instances.
     retire:
       disable: false
-      interval: 12h
+      cron: '0 1 * * *'
       reserve: 3
   instance:
     # By default, instance TTL = (times + 1) * interval
@@ -168,7 +168,7 @@ registry:
     notEditable: false
     # remove the schema without refs every 7d
     retire:
-      interval: 7d
+      cron: '0 2 * * *'
   # enable to register sc itself when startup
   selfRegister: 1
 
diff --git a/go.mod b/go.mod
index 5055d94..63b01a4 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ replace (
 )
 
 require (
+       github.com/robfig/cron/v3 v3.0.1
        github.com/NYTimes/gziphandler v1.1.1
        github.com/apache/servicecomb-service-center/api v0.0.0
        github.com/astaxie/beego v1.12.2
diff --git a/go.sum b/go.sum
index 8a38c15..7f895cf 100644
--- a/go.sum
+++ b/go.sum
@@ -563,6 +563,8 @@ github.com/prometheus/procfs v0.6.0/go.mod 
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
 github.com/prometheus/tsdb v0.7.1/go.mod 
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a 
h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod 
h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/robfig/cron/v3 v3.0.1 
h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod 
h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod 
h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod 
h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.1.0/go.mod 
h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
diff --git a/integration/instances_test.go b/integration/instances_test.go
index f5ac910..457931b 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -17,28 +17,26 @@
 package integrationtest_test
 
 import (
+       "bytes"
        "encoding/json"
        "fmt"
+       "io/ioutil"
+       "math/rand"
        "net/http"
+       "strconv"
        "strings"
        "sync"
-
-       "github.com/go-chassis/cari/discovery"
-       "github.com/gorilla/websocket"
-       "github.com/stretchr/testify/assert"
+       "testing"
+       "time"
 
        . "github.com/onsi/ginkgo"
        . "github.com/onsi/gomega"
-       "github.com/widuu/gojson"
-
-       "bytes"
-       "io/ioutil"
-       "math/rand"
-       "strconv"
-       "testing"
-       "time"
 
        . "github.com/apache/servicecomb-service-center/integration"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/gorilla/websocket"
+       "github.com/stretchr/testify/assert"
+       "github.com/widuu/gojson"
 )
 
 var _ = Describe("MicroService Api Test", func() {
@@ -395,7 +393,7 @@ var _ = Describe("MicroService Api Test", func() {
                        })
 
                        It("Find Micro-service Info by alias", func() {
-                               req, _ := http.NewRequest(GET, 
SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+alias+"&version="+serviceVersion,
 nil)
+                               req, _ := http.NewRequest(GET, 
SCURL+FINDINSTANCE+"?noCache=true&appId="+serviceAppId+"&serviceName="+alias+"&version="+serviceVersion,
 nil)
                                req.Header.Set("X-Domain-Name", "default")
                                req.Header.Set("X-ConsumerId", consumerID)
                                resp, _ := scclient.Do(req)
diff --git a/server/job/disco/retire.go b/server/job/disco/retire.go
index 55e4fdf..fcc8f6e 100644
--- a/server/job/disco/retire.go
+++ b/server/job/disco/retire.go
@@ -20,49 +20,45 @@ package disco
 import (
        "context"
        "fmt"
-       "time"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/server/config"
        discosvc 
"github.com/apache/servicecomb-service-center/server/service/disco"
-       "github.com/go-chassis/foundation/gopool"
+       "github.com/robfig/cron/v3"
 )
 
 const (
-       defaultRetireMicroserviceInterval = 12 * time.Hour
-       defaultReserveVersionCount        = 3
+       defaultRetireMicroserviceCron = "0 1 * * *"
+       defaultReserveVersionCount    = 3
 )
 
 func init() {
-       startRetireServiceJob()
-}
-
-func startRetireServiceJob() {
        disable := config.GetBool("registry.service.retire.disable", false)
        if disable {
                return
        }
+       startRetireServiceJob()
+}
 
+func startRetireServiceJob() {
        localPlan := &datasource.RetirePlan{
-               Interval: 
config.GetDuration("registry.service.retire.interval", 
defaultRetireMicroserviceInterval),
-               Reserve:  config.GetInt("registry.service.retire.reserve", 
defaultReserveVersionCount),
+               Cron:    config.GetString("registry.service.retire.cron", 
defaultRetireMicroserviceCron),
+               Reserve: config.GetInt("registry.service.retire.reserve", 
defaultReserveVersionCount),
        }
-
        log.Info(fmt.Sprintf("start retire microservice job, plan is %v", 
localPlan))
-       gopool.Go(func(ctx context.Context) {
-               tick := time.NewTicker(localPlan.Interval)
-               defer tick.Stop()
-               for {
-                       select {
-                       case <-ctx.Done():
-                               return
-                       case <-tick.C:
-                               err := discosvc.RetireService(ctx, localPlan)
-                               if err != nil {
-                                       log.Error("retire microservice failed", 
err)
-                               }
-                       }
+
+       c := cron.New()
+       _, err := c.AddFunc(localPlan.Cron, func() {
+               //TODO use DLock
+               err := discosvc.RetireService(context.Background(), localPlan)
+               if err != nil {
+                       log.Error("retire microservice failed", err)
                }
        })
+       if err != nil {
+               log.Error("cron add func failed", err)
+               return
+       }
+       c.Start()
 }
diff --git a/server/service/disco/retire.go b/server/job/disco/schema.go
similarity index 53%
copy from server/service/disco/retire.go
copy to server/job/disco/schema.go
index 9b64b26..9ba782f 100644
--- a/server/service/disco/retire.go
+++ b/server/job/disco/schema.go
@@ -19,10 +19,32 @@ package disco
 
 import (
        "context"
+       "fmt"
 
-       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/config"
+       discosvc 
"github.com/apache/servicecomb-service-center/server/service/disco"
+       "github.com/robfig/cron/v3"
 )
 
-func RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
-       return datasource.GetMetadataManager().RetireService(ctx, plan)
+const (
+       defaultRetireSchemaCron = "0 2 * * *"
+)
+
+func init() {
+       cronExpr := config.GetString("registry.schema.retire.cron", 
defaultRetireSchemaCron)
+       log.Info(fmt.Sprintf("start retire schema job, plan is %v", cronExpr))
+       c := cron.New()
+       _, err := c.AddFunc(cronExpr, func() {
+               //TODO use DLock
+               err := discosvc.RetireSchema(context.Background())
+               if err != nil {
+                       log.Error("retire schema failed", err)
+               }
+       })
+       if err != nil {
+               log.Error("cron add func failed", err)
+               return
+       }
+       c.Start()
 }
diff --git a/server/service/disco/retire.go b/server/service/disco/retire.go
index 9b64b26..0d17d15 100644
--- a/server/service/disco/retire.go
+++ b/server/service/disco/retire.go
@@ -19,10 +19,25 @@ package disco
 
 import (
        "context"
+       "fmt"
 
        "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/schema"
+       "github.com/apache/servicecomb-service-center/pkg/log"
 )
 
 func RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
        return datasource.GetMetadataManager().RetireService(ctx, plan)
 }
+
+func RetireSchema(ctx context.Context) error {
+       n, err := schema.Instance().DeleteNoRefContents(ctx)
+       if err != nil {
+               log.Error("delete no ref contents failed", err)
+               return err
+       }
+       if n > 0 {
+               log.Warn(fmt.Sprintf("%d schema-contents retired", n))
+       }
+       return nil
+}
diff --git a/server/service/disco/retire_test.go 
b/server/service/disco/retire_test.go
index ca778c9..c41fbc6 100644
--- a/server/service/disco/retire_test.go
+++ b/server/service/disco/retire_test.go
@@ -22,6 +22,7 @@ import (
        "testing"
 
        "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/schema"
        discosvc 
"github.com/apache/servicecomb-service-center/server/service/disco"
        "github.com/apache/servicecomb-service-center/test"
        pb "github.com/go-chassis/cari/discovery"
@@ -54,11 +55,12 @@ func TestRetireService(t *testing.T) {
                                idx := fmt.Sprintf("%d", i)
                                discosvc.UnregisterService(ctx, 
&pb.DeleteServiceRequest{
                                        ServiceId: serviceIDPrefix + idx,
+                                       Force:     true,
                                })
                        }
                }()
 
-               err := discosvc.RetireService(ctx, 
&datasource.RetirePlan{Interval: 0, Reserve: 1})
+               err := discosvc.RetireService(ctx, 
&datasource.RetirePlan{Reserve: 1})
                assert.NoError(t, err)
 
                resp, err := 
datasource.GetMetadataManager().ListServiceDetail(ctx, 
&pb.GetServicesInfoRequest{
@@ -69,3 +71,54 @@ func TestRetireService(t *testing.T) {
                assert.Equal(t, serviceIDPrefix+"4", 
resp.AllServicesDetail[0].MicroService.ServiceId)
        })
 }
+
+func TestRetireSchema(t *testing.T) {
+       if !test.IsETCD() {
+               return
+       }
+
+       var serviceID string
+
+       ctx := getContext()
+       service, err := discosvc.RegisterService(ctx, &pb.CreateServiceRequest{
+               Service: &pb.MicroService{
+                       ServiceName: "TestRetireSchema",
+               },
+       })
+       assert.NoError(t, err)
+       serviceID = service.ServiceId
+       schemaID := "schemaID_1"
+       content := "content_1"
+       hash := schema.Hash(schemaID, content)
+       defer schema.Instance().DeleteContent(ctx, &schema.ContentRequest{Hash: 
hash})
+       defer discosvc.UnregisterService(ctx, 
&pb.DeleteServiceRequest{ServiceId: serviceID, Force: true})
+
+       t.Run("retire schema with ref, should not delete it", func(t 
*testing.T) {
+               err = discosvc.PutSchema(ctx, &pb.ModifySchemaRequest{
+                       ServiceId: serviceID,
+                       SchemaId:  schemaID,
+                       Schema:    content,
+               })
+               assert.NoError(t, err)
+
+               err := discosvc.RetireSchema(ctx)
+               assert.NoError(t, err)
+
+               _, err = discosvc.GetSchema(ctx, 
&pb.GetSchemaRequest{ServiceId: serviceID, SchemaId: schemaID})
+               assert.NoError(t, err)
+       })
+
+       t.Run("retire schema without ref, should delete it", func(t *testing.T) 
{
+               err := discosvc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{
+                       ServiceId: serviceID,
+                       SchemaId:  schemaID,
+               })
+               assert.NoError(t, err)
+
+               err = discosvc.RetireSchema(ctx)
+               assert.NoError(t, err)
+
+               _, err = schema.Instance().GetContent(ctx, 
&schema.ContentRequest{Hash: hash})
+               assert.ErrorIs(t, schema.ErrSchemaContentNotFound, err)
+       })
+}
diff --git a/server/service/disco/schema_test.go 
b/server/service/disco/schema_test.go
index 6930dc4..a77764c 100644
--- a/server/service/disco/schema_test.go
+++ b/server/service/disco/schema_test.go
@@ -1014,7 +1014,8 @@ func TestCompatibleOperateSchema(t *testing.T) {
                })
                assert.NoError(t, err)
                assert.Equal(t, 2, len(schemas))
-               schema = schemas[1]
+               schema = findSchemaBySchemaID(schemas, "schemaID_2")
+               assert.NotNil(t, schema)
                assert.Equal(t, "schema_2", schema.Schema)
                assert.Equal(t, "summary2", schema.Summary)
        })
@@ -1040,10 +1041,14 @@ func TestCompatibleOperateSchema(t *testing.T) {
                })
                assert.NoError(t, err)
                assert.Equal(t, 2, len(schemas))
-               schema = schemas[0]
+
+               schema = findSchemaBySchemaID(schemas, "schemaID_1")
+               assert.NotNil(t, schema)
                assert.Empty(t, schema.Schema)
                assert.Empty(t, schema.Summary)
-               schema = schemas[1]
+
+               schema = findSchemaBySchemaID(schemas, "schemaID_2")
+               assert.NotNil(t, schema)
                assert.Equal(t, "schema_2", schema.Schema)
                assert.Equal(t, "summary2", schema.Summary)
        })
@@ -1069,11 +1074,24 @@ func TestCompatibleOperateSchema(t *testing.T) {
                })
                assert.NoError(t, err)
                assert.Equal(t, 2, len(schemas))
-               schema = schemas[0]
+
+               schema = findSchemaBySchemaID(schemas, "schemaID_1")
+               assert.NotNil(t, schema)
                assert.Empty(t, schema.Schema)
                assert.Empty(t, schema.Summary)
-               schema = schemas[1]
+
+               schema = findSchemaBySchemaID(schemas, "schemaID_2")
+               assert.NotNil(t, schema)
                assert.Empty(t, schema.Schema)
                assert.Empty(t, schema.Summary)
        })
 }
+
+func findSchemaBySchemaID(schemas []*pb.Schema, schemaID string) *pb.Schema {
+       for _, schema := range schemas {
+               if schema.SchemaId == schemaID {
+                       return schema
+               }
+       }
+       return nil
+}

Reply via email to