This is an automated email from the ASF dual-hosted git repository. littlecui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new 1d56c95 SCB-317 Prepare the release for Service-Center-1.0.0-m1 (#269) 1d56c95 is described below commit 1d56c950820ceae9863df9993415dd378265d1a5 Author: little-cui <sure_0...@qq.com> AuthorDate: Fri Feb 2 21:07:39 2018 +0800 SCB-317 Prepare the release for Service-Center-1.0.0-m1 (#269) * Optimize code. (cherry picked from commit b16e646) * SCB-317 Fix UT failure --- pkg/cache/cache.go | 331 ----------------------------- pkg/etcdsync/README.md | 4 +- pkg/etcdsync/etcdsync_suite_test.go | 2 +- pkg/etcdsync/mutex.go | 14 -- server/service/event/rule_event_handler.go | 8 +- server/service/event/tag_event_handler.go | 8 +- server/service/microservices.go | 7 - server/service/util/dependency.go | 64 ------ server/service/util/dependency_test.go | 8 - server/service/util/microservice_util.go | 25 +-- server/service/util/util_suite_test.go | 21 -- 11 files changed, 8 insertions(+), 484 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go deleted file mode 100644 index c54a4a5..0000000 --- a/pkg/cache/cache.go +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cache - -import ( - "container/list" - "errors" - "fmt" - "sync" - "time" -) - -// Cache is a goroutine-safe K/V cache. -type Cache struct { - sync.RWMutex - items map[string]*Item - defaultExpiration time.Duration -} - -type Item struct { - Object interface{} - Expiration *time.Time -} - -// Returns true if the item has expired. -func (item *Item) Expired() bool { - if item.Expiration == nil { - return false - } - return item.Expiration.Before(time.Now()) -} - -// New create a new cache with a given default expiration duration and cleanup -// interval. If the expiration duration is less than 1, the items in the cache -// never expire (by default), and must be deleted manually. If the cleanup -// interval is less than one, expired items are not deleted from the cache -// before calling DeleteExpired. -func New(defaultExpiration, cleanInterval time.Duration) *Cache { - c := &Cache{ - items: map[string]*Item{}, - defaultExpiration: defaultExpiration, - } - if cleanInterval > 0 { - go func() { - for { - time.Sleep(cleanInterval) - c.DeleteExpired() - } - }() - } - return c -} - -// Get return an item or nil, and a bool indicating whether -// the key was found. -func (c *Cache) Get(key string) (interface{}, bool) { - c.RLock() - item, ok := c.items[key] - if !ok || item.Expired() { - c.RUnlock() - return nil, false - } - c.RUnlock() - return item.Object, true -} - -// Get all cache keys -func (c *Cache) Keys() []string { - c.RLock() - defer c.RUnlock() - keys := make([]string, 0, len(c.items)) - for k := range c.items { - keys = append(keys, k) - } - return keys -} - -// Set add a new key or replace an exist key. If the dur is 0, we will -// use the defaultExpiration. -func (c *Cache) Set(key string, val interface{}, dur time.Duration) { - var t *time.Time - c.Lock() - if dur == 0 { - dur = c.defaultExpiration - } - if dur > 0 { - tmp := time.Now().Add(dur) - t = &tmp - } - c.items[key] = &Item{ - Object: val, - Expiration: t, - } - c.Unlock() -} - -// Delete a key-value pair if the key is existed. -func (c *Cache) Delete(key string) { - c.Lock() - delete(c.items, key) - c.Unlock() -} - -// Delete all cache. -func (c *Cache) Flush() { - c.Lock() - c.items = map[string]*Item{} - c.Unlock() -} - -// Add a number to a key-value pair. -func (c *Cache) Increment(key string, x int64) error { - c.Lock() - val, ok := c.items[key] - if !ok || val.Expired() { - c.Unlock() - return fmt.Errorf("Item %s not found", key) - } - switch val.Object.(type) { - case int: - val.Object = val.Object.(int) + int(x) - case int8: - val.Object = val.Object.(int8) + int8(x) - case int16: - val.Object = val.Object.(int16) + int16(x) - case int32: - val.Object = val.Object.(int32) + int32(x) - case int64: - val.Object = val.Object.(int64) + x - case uint: - val.Object = val.Object.(uint) + uint(x) - case uint8: - val.Object = val.Object.(uint8) + uint8(x) - case uint16: - val.Object = val.Object.(uint16) + uint16(x) - case uint32: - val.Object = val.Object.(uint32) + uint32(x) - case uint64: - val.Object = val.Object.(uint64) + uint64(x) - case uintptr: - val.Object = val.Object.(uintptr) + uintptr(x) - default: - c.Unlock() - return fmt.Errorf("The value type error") - } - c.Unlock() - return nil -} - -// Sub a number to a key-value pair. -func (c *Cache) Decrement(key string, x int64) error { - c.Lock() - val, ok := c.items[key] - if !ok || val.Expired() { - c.Unlock() - return fmt.Errorf("Item %s not found", key) - } - switch val.Object.(type) { - case int: - val.Object = val.Object.(int) - int(x) - case int8: - val.Object = val.Object.(int8) - int8(x) - case int16: - val.Object = val.Object.(int16) - int16(x) - case int32: - val.Object = val.Object.(int32) - int32(x) - case int64: - val.Object = val.Object.(int64) - x - case uint: - val.Object = val.Object.(uint) - uint(x) - case uint8: - val.Object = val.Object.(uint8) - uint8(x) - case uint16: - val.Object = val.Object.(uint16) - uint16(x) - case uint32: - val.Object = val.Object.(uint32) - uint32(x) - case uint64: - val.Object = val.Object.(uint64) - uint64(x) - case uintptr: - val.Object = val.Object.(uintptr) - uintptr(x) - default: - c.Unlock() - return fmt.Errorf("The value type error") - } - c.Unlock() - return nil -} - -// Return the number of item in cache. -func (c *Cache) ItemCount() int { - c.RLock() - counts := len(c.items) - c.RUnlock() - return counts -} - -// Delete all expired items. -func (c *Cache) DeleteExpired() { - c.Lock() - for k, v := range c.items { - if v.Expired() { - delete(c.items, k) - } - } - c.Unlock() -} - -// The LRUCache is a goroutine-safe cache. -type LRUCache struct { - sync.RWMutex - maxEntries int - items map[string]*list.Element - cacheList *list.List -} - -type entry struct { - key string - value interface{} -} - -// NewLRU create a LRUCache with max size. The size is 0 means no limit. -func NewLRU(size int) (*LRUCache, error) { - if size < 0 { - return nil, errors.New("The size of LRU Cache must no less than 0") - } - lru := &LRUCache{ - maxEntries: size, - items: make(map[string]*list.Element, size), - cacheList: list.New(), - } - return lru, nil -} - -// Add a new key-value pair to the LRUCache. -func (c *LRUCache) Add(key string, value interface{}) { - c.Lock() - defer c.Unlock() - if ent, hit := c.items[key]; hit { - c.cacheList.MoveToFront(ent) - ent.Value.(*entry).value = value - return - } - ent := &entry{ - key: key, - value: value, - } - entry := c.cacheList.PushFront(ent) - c.items[key] = entry - - if c.maxEntries > 0 && c.cacheList.Len() > c.maxEntries { - c.removeOldestElement() - } -} - -// Get a value from the LRUCache. And a bool indicating -// whether found or not. -func (c *LRUCache) Get(key string) (interface{}, bool) { - c.RLock() - defer c.RUnlock() - - if ent, hit := c.items[key]; hit { - c.cacheList.MoveToFront(ent) - return ent.Value.(*entry).value, true - } - return nil, false -} - -// Remove a key-value pair in LRUCache. If the key is not existed, -// nothing will happen. -func (c *LRUCache) Remove(key string) { - c.Lock() - defer c.Unlock() - - if ent, hit := c.items[key]; hit { - c.removeElement(ent) - } -} - -// Return the number of key-value pair in LRUCache. -func (c *LRUCache) Len() int { - c.RLock() - length := c.cacheList.Len() - c.RUnlock() - return length -} - -// Delete all entry in the LRUCache. But the max size will hold. -func (c *LRUCache) Clear() { - c.Lock() - c.cacheList = list.New() - c.items = make(map[string]*list.Element, c.maxEntries) - c.Unlock() -} - -// Resize the max limit. -func (c *LRUCache) SetMaxEntries(max int) error { - if max < 0 { - return errors.New("The max limit of entryies must no less than 0") - } - c.Lock() - c.maxEntries = max - c.Unlock() - return nil -} - -func (c *LRUCache) removeElement(e *list.Element) { - c.cacheList.Remove(e) - ent := e.Value.(*entry) - delete(c.items, ent.key) -} - -func (c *LRUCache) removeOldestElement() { - ent := c.cacheList.Back() - if ent != nil { - c.removeElement(ent) - } -} diff --git a/pkg/etcdsync/README.md b/pkg/etcdsync/README.md index c31e3fd..06b68b4 100644 --- a/pkg/etcdsync/README.md +++ b/pkg/etcdsync/README.md @@ -2,8 +2,8 @@ ## example -```bash -lock, _ := etcdsync.Lock("/test") +```go +lock, _ := etcdsync.Lock("/test", true) defer lock.Unlock() //do something g += 1 diff --git a/pkg/etcdsync/etcdsync_suite_test.go b/pkg/etcdsync/etcdsync_suite_test.go index 345182d..5b40871 100644 --- a/pkg/etcdsync/etcdsync_suite_test.go +++ b/pkg/etcdsync/etcdsync_suite_test.go @@ -41,7 +41,7 @@ func BenchmarkLock(b *testing.B) { var g = 0 b.RunParallel(func(pb *testing.PB) { for pb.Next() { - lock, _ := etcdsync.Lock("/test") + lock, _ := etcdsync.Lock("/test", true) defer lock.Unlock() //do something g += 1 diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go index 96adcf8..3cb4bcf 100644 --- a/pkg/etcdsync/mutex.go +++ b/pkg/etcdsync/mutex.go @@ -35,7 +35,6 @@ const ( ROOT_PATH = "/cse/etcdsync" ) -// A Mutex is a mutual exclusion lock which is distributed across a cluster. type DLockFactory struct { key string ctx context.Context @@ -69,9 +68,6 @@ func init() { pid = os.Getpid() } -// New creates a Mutex with the given key which must be the same -// across the cluster nodes. -// machines are the ectd cluster addresses func NewLockFactory(key string, ttl int64) *DLockFactory { if len(key) == 0 { return nil @@ -88,10 +84,6 @@ func NewLockFactory(key string, ttl int64) *DLockFactory { } } -// Lock locks m. -// If the lock is already in use, the calling goroutine -// blocks until the mutex is available. Flag wait is false, -// this function is non-block when lock exist. func (m *DLockFactory) NewDLock(wait bool) (l *DLock, err error) { if !IsDebug { m.mutex.Lock() @@ -182,12 +174,6 @@ func (m *DLock) Lock(wait bool) error { } } -// Unlock unlocks m. -// It is a run-time error if m is not locked on entry to Unlock. -// -// A locked Mutex is not associated with a particular goroutine. -// It is allowed for one goroutine to lock a Mutex and then -// arrange for another goroutine to unlock it. func (m *DLock) Unlock() (err error) { opts := []registry.PluginOpOption{ registry.DEL, diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go index a2d2650..f85b62c 100644 --- a/server/service/event/rule_event_handler.go +++ b/server/service/event/rule_event_handler.go @@ -57,12 +57,8 @@ func (apt *RulesChangedAsyncTask) publish(ctx context.Context, domainProject, pr return err } if provider == nil { - tmpProvider, found := serviceUtil.MsCache().Get(providerId) - if !found { - util.Logger().Errorf(nil, "provider %s does not exist", providerId) - return fmt.Errorf("provider %s does not exist", providerId) - } - provider = tmpProvider.(*pb.MicroService) + util.Logger().Errorf(nil, "provider %s does not exist", providerId) + return fmt.Errorf("provider %s does not exist", providerId) } consumerIds, err := serviceUtil.GetConsumersInCache(ctx, domainProject, provider) diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go index 42ea494..cdf96d6 100644 --- a/server/service/event/tag_event_handler.go +++ b/server/service/event/tag_event_handler.go @@ -57,12 +57,8 @@ func (apt *TagsChangedAsyncTask) publish(ctx context.Context, domainProject, con return err } if consumer == nil { - consumerTmp, found := serviceUtil.MsCache().Get(consumerId) - if !found { - util.Logger().Errorf(nil, "service not exist, %s", consumerId) - return fmt.Errorf("service not exist, %s", consumerId) - } - consumer = consumerTmp.(*pb.MicroService) + util.Logger().Errorf(nil, "service not exist, %s", consumerId) + return fmt.Errorf("service not exist, %s", consumerId) } providerIds, err := serviceUtil.GetProvidersInCache(ctx, domainProject, consumer) if err != nil { diff --git a/server/service/microservices.go b/server/service/microservices.go index 60463a8..8514333 100644 --- a/server/service/microservices.go +++ b/server/service/microservices.go @@ -254,13 +254,6 @@ func (s *MicroServiceService) DeleteServicePri(ctx context.Context, serviceId st } } - //refresh msCache consumerCache, ensure that watch can notify consumers when no cache. - err = serviceUtil.RefreshDependencyCache(ctx, domainProject, service) - if err != nil { - util.Logger().Errorf(err, "%s micro-service failed, serviceId is %s: inner err, refresh service dependency cache failed.", title, serviceId) - return pb.CreateResponse(scerr.ErrInternal, "Refresh dependency cache failed."), err - } - serviceKey := &pb.MicroServiceKey{ Tenant: domainProject, Environment: service.Environment, diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go index 3e9e929..f78cd92 100644 --- a/server/service/util/dependency.go +++ b/server/service/util/dependency.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/apache/incubator-servicecomb-service-center/pkg/cache" "github.com/apache/incubator-servicecomb-service-center/pkg/util" apt "github.com/apache/incubator-servicecomb-service-center/server/core" "github.com/apache/incubator-servicecomb-service-center/server/core/backend" @@ -30,25 +29,8 @@ import ( "github.com/apache/incubator-servicecomb-service-center/server/infra/registry" "golang.org/x/net/context" "strings" - "time" ) -var consumerCache *cache.Cache -var providerCache *cache.Cache - -/* -缓存2分钟过期 -1分钟周期缓存consumers 遍历所有serviceid并查询consumers 做缓存 -当发现新查询到的consumers列表变成0时则不做cache set操作 -这样当consumers关系完全被删除也有1分钟的时间窗让实例变化推送到相应的consumers里 1分鐘后緩存也會自動清理 -实例推送中的依赖发现实时性为T+1分钟 -*/ -func init() { - d, _ := time.ParseDuration("2m") - consumerCache = cache.New(d, d) - providerCache = cache.New(d, d) -} - func GetConsumersInCache(ctx context.Context, domainProject string, provider *pb.MicroService) ([]string, error) { // 查询所有consumer dr := NewProviderDependencyRelation(ctx, domainProject, provider) @@ -57,17 +39,6 @@ func GetConsumersInCache(ctx context.Context, domainProject string, provider *pb util.Logger().Errorf(err, "Get dependency consumerIds failed.%s", provider.ServiceId) return nil, err } - - if len(consumerIds) == 0 { - consumerIds, found := consumerCache.Get(provider.ServiceId) - if found && len(consumerIds.([]string)) > 0 { - return consumerIds.([]string), nil - } - util.Logger().Warnf(nil, "Can not find any consumer from local cache and backend. provider is %s", - provider.ServiceId) - return nil, nil - } - return consumerIds, nil } @@ -79,44 +50,9 @@ func GetProvidersInCache(ctx context.Context, domainProject string, consumer *pb util.Logger().Errorf(err, "Get dependency providerIds failed.%s", consumer.ServiceId) return nil, err } - - if len(providerIds) == 0 { - providerIds, found := providerCache.Get(consumer.ServiceId) - if found && len(providerIds.([]string)) > 0 { - return providerIds.([]string), nil - } - util.Logger().Warnf(nil, "Can not find any provider from local cache and backend. consumer is %s", - consumer.ServiceId) - return nil, nil - } - return providerIds, nil } -func RefreshDependencyCache(ctx context.Context, domainProject string, service *pb.MicroService) error { - dr := NewDependencyRelation(ctx, domainProject, service, service) - consumerIds, err := dr.GetDependencyConsumerIds() - if err != nil { - util.Logger().Errorf(err, "%s,refresh dependency cache failed, get consumerIds failed.", service.ServiceId) - return err - } - providerIds, err := dr.GetDependencyProviderIds() - if err != nil { - util.Logger().Errorf(err, "%s,refresh dependency cache failed, get providerIds failed.", service.ServiceId) - return err - } - MsCache().Set(service.ServiceId, service, 5*time.Minute) - if len(consumerIds) > 0 { - util.Logger().Infof("refresh %s dependency cache: cached %d consumerId(s) for 5min.", service.ServiceId, len(consumerIds)) - consumerCache.Set(service.ServiceId, consumerIds, 5*time.Minute) - } - if len(providerIds) > 0 { - util.Logger().Infof("refresh %s dependency cache: cached %d providerId(s) for 5min.", service.ServiceId, len(providerIds)) - providerCache.Set(service.ServiceId, providerIds, 5*time.Minute) - } - return nil -} - func GetConsumerIdsByProvider(ctx context.Context, domainProject string, provider *pb.MicroService) (allow []string, deny []string, _ error) { if provider == nil || len(provider.ServiceId) == 0 { return nil, nil, fmt.Errorf("invalid provider") diff --git a/server/service/util/dependency_test.go b/server/service/util/dependency_test.go index 546459e..52104c7 100644 --- a/server/service/util/dependency_test.go +++ b/server/service/util/dependency_test.go @@ -24,14 +24,6 @@ import ( "testing" ) -func TestRefreshDependencyCache(t *testing.T) { - err := RefreshDependencyCache(context.Background(), "", &proto.MicroService{}) - if err == nil { - fmt.Printf(`RefreshDependencyCache failed`) - t.FailNow() - } -} - func TestDeleteDependencyForService(t *testing.T) { _, err := DeleteDependencyForDeleteService("", "", &proto.MicroServiceKey{}) if err != nil { diff --git a/server/service/util/microservice_util.go b/server/service/util/microservice_util.go index a759dff..c740668 100644 --- a/server/service/util/microservice_util.go +++ b/server/service/util/microservice_util.go @@ -18,7 +18,6 @@ package util import ( "encoding/json" - "github.com/apache/incubator-servicecomb-service-center/pkg/cache" "github.com/apache/incubator-servicecomb-service-center/pkg/util" apt "github.com/apache/incubator-servicecomb-service-center/server/core" "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store" @@ -28,20 +27,8 @@ import ( "github.com/apache/incubator-servicecomb-service-center/server/plugin" "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" - "time" ) -var msCache *cache.Cache - -func MsCache() *cache.Cache { - return msCache -} - -func init() { - d, _ := time.ParseDuration("1m") - msCache = cache.New(d, d) -} - /* get Service by service id */ @@ -65,17 +52,7 @@ func GetServiceWithRev(ctx context.Context, domain string, id string, rev int64) } func GetServiceInCache(ctx context.Context, domain string, id string) (*pb.MicroService, error) { - ms, ok := msCache.Get(id) - if !ok { - ms, err := GetService(ctx, domain, id) - if ms == nil { - return nil, err - } - msCache.Set(id, ms, 0) - return ms, nil - } - - return ms.(*pb.MicroService), nil + return GetService(ctx, domain, id) } func GetService(ctx context.Context, domainProject string, serviceId string) (*pb.MicroService, error) { diff --git a/server/service/util/util_suite_test.go b/server/service/util/util_suite_test.go index 05de94d..7cbb461 100644 --- a/server/service/util/util_suite_test.go +++ b/server/service/util/util_suite_test.go @@ -116,27 +116,6 @@ func TestGetService(t *testing.T) { } } -func TestMsCache(t *testing.T) { - defer func() { - if r := recover(); r != nil { - t.FailNow() - } - }() - _, err := serviceUtil.GetServiceInCache(context.Background(), "", "") - if err == nil { - t.FailNow() - } - ms := serviceUtil.MsCache() - if ms == nil { - t.FailNow() - } - ms.Set("", &proto.MicroService{}, 0) - _, err = serviceUtil.GetServiceInCache(context.Background(), "", "") - if err != nil { - t.FailNow() - } -} - func TestFromContext(t *testing.T) { ctx := context.WithValue(context.Background(), "noCache", "1") opts := serviceUtil.FromContext(ctx) -- To stop receiving notification emails like this one, please contact little...@apache.org.