This is an automated email from the ASF dual-hosted git repository. asifdxtreme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new a4b6ce4 SCB-296 Self preservation will never stop when SC do list-watch loop (#261) a4b6ce4 is described below commit a4b6ce4d60309361940b9039b9a16ce641af183a Author: little-cui <sure_0...@qq.com> AuthorDate: Mon Jan 29 12:39:18 2018 +0800 SCB-296 Self preservation will never stop when SC do list-watch loop (#261) * SCB-296 Self preservation will never stop when SC do list-watch loop --- etc/conf/app.conf | 5 +- integration/instances_test.go | 2 +- pkg/chain/chain.go | 2 +- pkg/httplimiter/httpratelimiter.go | 13 +- pkg/logrotate/logrotate.go | 4 +- pkg/ratelimiter/ratelimiter.go | 5 +- pkg/rest/client.go | 2 +- pkg/rest/route.go | 16 +- server/core/backend/store/cacher.go | 104 ++++++++----- server/core/backend/store/defer.go | 173 +++++++++++---------- server/core/backend/store/defer_test.go | 141 +++++++++++++++++ server/core/backend/store/indexer.go | 5 - server/core/backend/store/listwatch.go | 10 +- server/core/backend/store/opt.go | 4 +- server/core/backend/store/store.go | 7 - server/infra/quota/quota.go | 2 +- server/plugin/infra/registry/etcd/etcd.go | 75 ++++----- .../rest/controller/v4/microservice_controller.go | 1 - server/service/event/dependency_event_handler.go | 10 +- server/service/instances.go | 8 +- server/service/instances_test.go | 2 +- server/service/microservices.go | 3 +- server/service/rule.go | 1 - server/service/util/dependency.go | 8 +- 24 files changed, 376 insertions(+), 227 deletions(-) diff --git a/etc/conf/app.conf b/etc/conf/app.conf index 9d54f2d..a48f08c 100644 --- a/etc/conf/app.conf +++ b/etc/conf/app.conf @@ -26,11 +26,12 @@ plugins_dir = ./plugins registry_plugin = etcd # registry address -# registry_plugin equals to 'embeded_etcd', example: +# 1. if registry_plugin equals to 'embeded_etcd' # manager_name = "sc-0" # manager_addr = "http://127.0.0.1:2380" # manager_cluster = "sc-0=http://127.0.0.1:2380" -# registry_plugin equals to 'etcd' +# 2. if registry_plugin equals to 'etcd' +# manager_cluster = "127.0.0.1:2379" manager_cluster = "127.0.0.1:2379" #heartbeat that sync synchronizes client's endpoints with the known endpoints from the etcd membership,unit is second. diff --git a/integration/instances_test.go b/integration/instances_test.go index 8ab0a78..948c9f6 100644 --- a/integration/instances_test.go +++ b/integration/instances_test.go @@ -248,7 +248,7 @@ var _ = Describe("MicroService Api Test", func() { By("Discover MicroService Instance API", func() { It("Find Micro-service Info by AppID", func() { - req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil) + req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?noCache=1&appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil) req.Header.Set("X-Domain-Name", "default") req.Header.Set("X-ConsumerId", serviceId) resp, _ := scclient.Do(req) diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go index 8517824..a48fc1c 100644 --- a/pkg/chain/chain.go +++ b/pkg/chain/chain.go @@ -60,7 +60,7 @@ func (c *Chain) syncNext(i *Invocation) { } func (c *Chain) Next(i *Invocation) { - go c.syncNext(i) + c.syncNext(i) } func NewChain(name string, handlers []Handler) Chain { diff --git a/pkg/httplimiter/httpratelimiter.go b/pkg/httplimiter/httpratelimiter.go index 9128d5b..3110056 100644 --- a/pkg/httplimiter/httpratelimiter.go +++ b/pkg/httplimiter/httpratelimiter.go @@ -18,13 +18,13 @@ package httplimiter import ( + "fmt" + "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter" "net/http" "strconv" "strings" - "time" - "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter" - "fmt" "sync" + "time" ) type HTTPErrorMessage struct { @@ -36,7 +36,6 @@ func (httpErrorMessage *HTTPErrorMessage) Error() string { return fmt.Sprintf("%v: %v", httpErrorMessage.StatusCode, httpErrorMessage.Message) } - type HttpLimiter struct { HttpMessage string ContentType string @@ -51,8 +50,6 @@ type HttpLimiter struct { sync.RWMutex } - - func LimitBySegments(limiter *HttpLimiter, keys []string) *HTTPErrorMessage { if limiter.LimitExceeded(strings.Join(keys, "|")) { return &HTTPErrorMessage{Message: limiter.HttpMessage, StatusCode: limiter.StatusCode} @@ -199,7 +196,6 @@ func getRemoteIP(ipLookups []string, r *http.Request) string { return "" } - func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter { limiter := &HttpLimiter{RequestLimit: max, TTL: ttl} limiter.ContentType = "text/plain; charset=utf-8" @@ -211,7 +207,6 @@ func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter { return limiter } - func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool { rateLimiter.Lock() if _, found := rateLimiter.leakyBuckets[key]; !found { @@ -224,5 +219,3 @@ func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool { } return true } - - diff --git a/pkg/logrotate/logrotate.go b/pkg/logrotate/logrotate.go index c2e9e93..76de98d 100644 --- a/pkg/logrotate/logrotate.go +++ b/pkg/logrotate/logrotate.go @@ -240,9 +240,9 @@ func LogRotate(path string, MaxFileSize int, MaxBackupCount int) { } } -func isSkip(f os.FileInfo) bool{ +func isSkip(f os.FileInfo) bool { //dir or non write permission,skip - return f.IsDir() || (f.Mode() & 0200 == 0000) + return f.IsDir() || (f.Mode()&0200 == 0000) } //path : where the file will be filtered diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go index 8184108..a46bc39 100644 --- a/pkg/ratelimiter/ratelimiter.go +++ b/pkg/ratelimiter/ratelimiter.go @@ -18,8 +18,8 @@ package ratelimiter import ( - "time" "sync" + "time" ) type LeakyBucket struct { @@ -32,7 +32,6 @@ type LeakyBucket struct { availableTicker int64 } - func NewLeakyBucket(fillInterval time.Duration, capacity, quantum int64) *LeakyBucket { if fillInterval <= 0 { panic("leaky bucket fill interval is not > 0") @@ -77,7 +76,6 @@ func (leakyBucket *LeakyBucket) MaximumTakeDuration(count int64, maxWait time.Du return leakyBucket.take(time.Now(), count, maxWait) } - func (leakyBucket *LeakyBucket) Rate() float64 { return 1e9 * float64(leakyBucket.quantum) / float64(leakyBucket.interval) } @@ -118,4 +116,3 @@ func (leakyBucket *LeakyBucket) adjust(now time.Time) (currentTick int64) { leakyBucket.availableTicker = currentTick return } - diff --git a/pkg/rest/client.go b/pkg/rest/client.go index 3ce4e29..2cd68ab 100644 --- a/pkg/rest/client.go +++ b/pkg/rest/client.go @@ -28,9 +28,9 @@ import ( "io/ioutil" "net" "net/http" + "net/url" "reflect" "time" - "net/url" ) const ( diff --git a/pkg/rest/route.go b/pkg/rest/route.go index 6595021..3015aff 100644 --- a/pkg/rest/route.go +++ b/pkg/rest/route.go @@ -23,6 +23,7 @@ import ( errorsEx "github.com/apache/incubator-servicecomb-service-center/pkg/errors" "github.com/apache/incubator-servicecomb-service-center/pkg/util" "net/http" + "net/url" "strings" ) @@ -77,7 +78,7 @@ func (this *ROAServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) for _, ph := range this.handlers[r.Method] { if params, ok := ph.try(r.URL.Path); ok { if len(params) > 0 { - r.URL.RawQuery = util.UrlEncode(params) + "&" + r.URL.RawQuery + r.URL.RawQuery = params + r.URL.RawQuery } this.serve(ph, w, r) @@ -151,7 +152,7 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler, w http.ResponseWriter <-ch } -func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) { +func (this *urlPatternHandler) try(path string) (p string, _ bool) { var i, j int l, sl := len(this.Path), len(path) for i < sl { @@ -160,7 +161,7 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) { if this.Path != "/" && l > 0 && this.Path[l-1] == '/' { return p, true } - return nil, false + return "", false case this.Path[j] == ':': var val string var nextc byte @@ -168,19 +169,16 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) { _, nextc, j = match(this.Path, isAlnum, 0, j+1) val, _, i = match(path, matchParticial, nextc, i) - if p == nil { - p = make(map[string]string, 5) - } - p[this.Path[o:j]] = val + p += url.QueryEscape(this.Path[o:j]) + "=" + url.QueryEscape(val) + "&" case path[i] == this.Path[j]: i++ j++ default: - return nil, false + return "", false } } if j != l { - return nil, false + return "", false } return p, true } diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go index 7ca82cf..c6ade97 100644 --- a/server/core/backend/store/cacher.go +++ b/server/core/backend/store/cacher.go @@ -120,6 +120,15 @@ func (c *KvCache) Have(k interface{}) (ok bool) { return } +func (c *KvCache) RLock() map[string]*mvccpb.KeyValue { + c.rwMux.RLock() + return c.store +} + +func (c *KvCache) RUnlock() { + c.rwMux.RUnlock() +} + func (c *KvCache) Lock() map[string]*mvccpb.KeyValue { c.rwMux.Lock() return c.store @@ -251,15 +260,39 @@ func (c *KvCacher) handleWatcher(watcher *Watcher) error { } func (c *KvCacher) needDeferHandle(evts []*Event) bool { - if c.Cfg.DeferHander == nil { + if c.Cfg.DeferHandler == nil { return false } - return c.Cfg.DeferHander.OnCondition(c.Cache(), evts) + return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts) +} + +func (c *KvCacher) refresh(stopCh <-chan struct{}) { + util.Logger().Debugf("start to list and watch %s", c.Cfg) + ctx, cancel := context.WithCancel(context.Background()) + c.goroute.Do(func(stopCh <-chan struct{}) { + defer cancel() + <-stopCh + }) + for { + start := time.Now() + c.ListAndWatch(ctx) + watchDuration := time.Since(start) + nextPeriod := 0 * time.Second + if watchDuration > 0 && c.Cfg.Period > watchDuration { + nextPeriod = c.Cfg.Period - watchDuration + } + select { + case <-stopCh: + util.Logger().Debugf("stop to list and watch %s", c.Cfg) + return + case <-time.After(nextPeriod): + } + } } func (c *KvCacher) deferHandle(stopCh <-chan struct{}) { - if c.Cfg.DeferHander == nil { + if c.Cfg.DeferHandler == nil { return } @@ -268,7 +301,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) { select { case <-stopCh: return - case evt, ok := <-c.Cfg.DeferHander.HandleChan(): + case evt, ok := <-c.Cfg.DeferHandler.HandleChan(): if !ok { <-time.After(time.Second) continue @@ -281,7 +314,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) { evts[i] = evt i++ - case <-time.After(time.Second): + case <-time.After(300 * time.Millisecond): if i == 0 { continue } @@ -305,9 +338,9 @@ func (c *KvCacher) sync(evts []*Event) { } func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event { - cache := c.Cache().(*KvCache) - store := cache.Lock() - defer cache.Unlock() + store := c.cache.RLock() + defer c.cache.RUnlock() + oc, nc := len(store), len(items) tc := oc + nc if tc == 0 { @@ -359,7 +392,7 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[ block[i] = &Event{ Revision: rev, Type: proto.EVT_DELETE, - Key: c.Cfg.Key, + Prefix: c.Cfg.Key, Object: v, } i++ @@ -387,7 +420,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt block[i] = &Event{ Revision: rev, Type: proto.EVT_CREATE, - Key: c.Cfg.Key, + Prefix: c.Cfg.Key, Object: v, } i++ @@ -407,7 +440,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt block[i] = &Event{ Revision: rev, Type: proto.EVT_UPDATE, - Key: c.Cfg.Key, + Prefix: c.Cfg.Key, Object: v, } i++ @@ -424,10 +457,9 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt } func (c *KvCacher) onEvents(evts []*Event) { - cache := c.Cache().(*KvCache) - idx := 0 + idx, init := 0, !c.IsReady() kvEvts := make([]*KvEvent, len(evts)) - store := cache.Lock() + store := c.cache.Lock() for _, evt := range evts { kv := evt.Object.(*mvccpb.KeyValue) key := util.BytesToStringWithNoCopy(kv.Key) @@ -456,7 +488,6 @@ func (c *KvCacher) onEvents(evts []*Event) { Action: t, KV: kv, } - idx++ case proto.EVT_DELETE: if !ok { util.Logger().Warnf(nil, "unexpected %s event! key %s does not exist", evt.Type, key) @@ -470,10 +501,15 @@ func (c *KvCacher) onEvents(evts []*Event) { Action: evt.Type, KV: prevKv, } - idx++ } + + if init && kvEvts[idx].Action == proto.EVT_CREATE { + kvEvts[idx].Action = proto.EVT_INIT + } + + idx++ } - cache.Unlock() + c.cache.Unlock() c.onKvEvents(kvEvts[:idx]) } @@ -488,30 +524,7 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) { } func (c *KvCacher) run() { - c.goroute.Do(func(stopCh <-chan struct{}) { - util.Logger().Debugf("start to list and watch %s", c.Cfg) - ctx, cancel := context.WithCancel(context.Background()) - c.goroute.Do(func(stopCh <-chan struct{}) { - defer cancel() - <-stopCh - }) - for { - start := time.Now() - c.ListAndWatch(ctx) - watchDuration := time.Now().Sub(start) - nextPeriod := 0 * time.Second - if watchDuration > 0 && c.Cfg.Period > watchDuration { - nextPeriod = c.Cfg.Period - watchDuration - } - select { - case <-stopCh: - util.Logger().Debugf("stop to list and watch %s", c.Cfg) - return - case <-time.After(nextPeriod): - } - } - }) - + c.goroute.Do(c.refresh) c.goroute.Do(c.deferHandle) } @@ -533,6 +546,15 @@ func (c *KvCacher) Ready() <-chan struct{} { return c.ready } +func (c *KvCacher) IsReady() bool { + select { + case <-c.ready: + return true + default: + return false + } +} + func NewKvCache(c *KvCacher, size int) *KvCache { return &KvCache{ owner: c, diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go index 7f7bbfa..173a44a 100644 --- a/server/core/backend/store/defer.go +++ b/server/core/backend/store/defer.go @@ -30,122 +30,127 @@ type DeferHandler interface { HandleChan() <-chan *Event } +type deferItem struct { + ttl *time.Timer + event *Event +} + type InstanceEventDeferHandler struct { Percent float64 - enabled bool - events map[string]*Event - ttls map[string]int64 - mux sync.RWMutex - deferCh chan *Event -} -func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool { - return iedh.Percent > 0 && total > 0 && - del > 1 && - float64(del/total) >= iedh.Percent + cache Cache + once sync.Once + enabled bool + items map[string]*deferItem + pendingCh chan []*Event + deferCh chan *Event } -func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool { - if !iedh.deferMode(cache.Size(), len(evts)) { +func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool { + if iedh.Percent <= 0 { return false } - for _, evt := range evts { - if evt.Type != pb.EVT_DELETE { - return false - } - } - return true -} - -func (iedh *InstanceEventDeferHandler) init() { - if iedh.deferCh == nil { + iedh.once.Do(func() { + iedh.cache = cache + iedh.items = make(map[string]*deferItem, event_block_size) + iedh.pendingCh = make(chan []*Event, event_block_size) iedh.deferCh = make(chan *Event, event_block_size) - } - - if iedh.events == nil { - iedh.events = make(map[string]*Event, event_block_size) - iedh.ttls = make(map[string]int64, event_block_size) util.Go(iedh.check) - } -} - -func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool { - iedh.mux.Lock() - if !iedh.enabled && iedh.needDefer(cache, evts) { - util.Logger().Warnf(nil, "self preservation is enabled, caught %d(>=%.0f%%) DELETE events", - len(evts), iedh.Percent*100) - iedh.enabled = true - } + }) - if !iedh.enabled { - iedh.mux.Unlock() - return false - } - - iedh.init() - - for _, evt := range evts { - kv, ok := evt.Object.(*mvccpb.KeyValue) - if !ok { - continue - } - key := util.BytesToStringWithNoCopy(kv.Key) - switch evt.Type { - case pb.EVT_CREATE, pb.EVT_UPDATE: - delete(iedh.events, key) - delete(iedh.ttls, key) + iedh.pendingCh <- evts + return true +} +func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error { + kv := evt.Object.(*mvccpb.KeyValue) + key := util.BytesToStringWithNoCopy(kv.Key) + _, ok := iedh.items[key] + switch evt.Type { + case pb.EVT_CREATE, pb.EVT_UPDATE: + if ok { util.Logger().Infof("recovered key %s events", key) + // return nil // no need to publish event to subscribers? + } + iedh.recover(evt) + case pb.EVT_DELETE: + if ok { + return nil + } - iedh.deferCh <- evt - case pb.EVT_DELETE: - var instance pb.MicroServiceInstance - err := json.Unmarshal(kv.Value, &instance) - if err != nil { - util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key) - continue - } - iedh.events[key] = evt - iedh.ttls[key] = int64(instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)) + var instance pb.MicroServiceInstance + err := json.Unmarshal(kv.Value, &instance) + if err != nil { + util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key) + return err + } + iedh.items[key] = &deferItem{ + ttl: time.NewTimer( + time.Duration(instance.HealthCheck.Interval*(instance.HealthCheck.Times+1)) * time.Second), + event: evt, } } - iedh.mux.Unlock() - return true + return nil +} + +func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event { + return iedh.deferCh } func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) { defer util.RecoverAndReport() + t, n := iedh.newTimer(), false for { select { case <-stopCh: return - case <-time.After(time.Second): - iedh.mux.Lock() - for key, ttl := range iedh.ttls { - ttl-- - if ttl > 0 { - iedh.ttls[key] = ttl - continue - } - - evt := iedh.events[key] - delete(iedh.events, key) - delete(iedh.ttls, key) + case evts := <-iedh.pendingCh: + for _, evt := range evts { + iedh.recoverOrDefer(evt) + } - util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key) + del := len(iedh.items) + if del > 0 && !n { + t.Stop() + t, n = iedh.newTimer(), true + } - iedh.deferCh <- evt + total := iedh.cache.Size() + if del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent { + iedh.enabled = true + util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events", + del, total, iedh.Percent*100) + } + case <-t.C: + t, n = iedh.newTimer(), false + + for key, item := range iedh.items { + if iedh.enabled { + select { + case <-item.ttl.C: + default: + continue + } + util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key) + } + iedh.recover(item.event) } - if iedh.enabled && len(iedh.ttls) == 0 { + + if iedh.enabled && len(iedh.items) == 0 { iedh.enabled = false util.Logger().Warnf(nil, "self preservation is stopped") } - iedh.mux.Unlock() } } } -func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event { - return iedh.deferCh +func (iedh *InstanceEventDeferHandler) newTimer() *time.Timer { + return time.NewTimer(2 * time.Second) // instance DELETE event will be delay. +} + +func (iedh *InstanceEventDeferHandler) recover(evt *Event) { + key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key) + delete(iedh.items, key) + iedh.deferCh <- evt } diff --git a/server/core/backend/store/defer_test.go b/server/core/backend/store/defer_test.go new file mode 100644 index 0000000..2c9c453 --- /dev/null +++ b/server/core/backend/store/defer_test.go @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package store + +import ( + "encoding/json" + "fmt" + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "github.com/coreos/etcd/mvcc/mvccpb" + "testing" + "time" +) + +func TestInstanceEventDeferHandler_OnCondition(t *testing.T) { + iedh := &InstanceEventDeferHandler{ + Percent: 0, + } + + if iedh.OnCondition(nil, nil) { + fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 0%% failed`) + t.FailNow() + } + + iedh.Percent = 0.01 + if !iedh.OnCondition(nil, nil) { + fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 1%% failed`) + t.FailNow() + } +} + +func TestInstanceEventDeferHandler_HandleChan(t *testing.T) { + inst := &pb.MicroServiceInstance{ + HealthCheck: &pb.HealthCheck{ + Interval: 4, + Times: 0, + }, + } + b, _ := json.Marshal(inst) + kv1 := &mvccpb.KeyValue{ + Key: util.StringToBytesWithNoCopy("/1"), + Value: b, + } + kv2 := &mvccpb.KeyValue{ + Key: util.StringToBytesWithNoCopy("/2"), + Value: b, + } + kv3 := &mvccpb.KeyValue{ + Key: util.StringToBytesWithNoCopy("/3"), + Value: b, + } + + cache := NewKvCache(nil, 1) + cache.store["/1"] = kv1 + cache.store["/2"] = kv2 + cache.store["/3"] = kv3 + + evts1 := []*Event{ + { + Type: pb.EVT_CREATE, + Object: kv1, + }, + { + Type: pb.EVT_UPDATE, + Object: kv1, + }, + } + evts2 := []*Event{ + { + Type: pb.EVT_DELETE, + Object: kv2, + }, + { + Type: pb.EVT_DELETE, + Object: kv3, + }, + } + evts3 := []*Event{ + { + Type: pb.EVT_CREATE, + Object: kv2, + }, + } + + iedh := &InstanceEventDeferHandler{ + Percent: 0.01, + } + + iedh.OnCondition(cache, evts1) + iedh.OnCondition(cache, evts2) + iedh.OnCondition(cache, evts3) + + getEvents(t, iedh) + + iedh.Percent = 0.8 + iedh.OnCondition(cache, evts1) + iedh.OnCondition(cache, evts2) + iedh.OnCondition(cache, evts3) + + getEvents(t, iedh) +} + +func getEvents(t *testing.T, iedh *InstanceEventDeferHandler) { + fmt.Println(time.Now()) + c := time.After(3 * time.Second) + var evt3 *Event + for { + select { + case evt := <-iedh.HandleChan(): + fmt.Println(time.Now(), evt) + if string(evt.Object.(*mvccpb.KeyValue).Key) == "/3" { + evt3 = evt + if iedh.Percent == 0.01 && evt.Type == pb.EVT_DELETE { + fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 1%% failed`) + t.FailNow() + } + } + continue + case <-c: + if iedh.Percent == 0.8 && evt3 == nil { + fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 80%% failed`) + t.FailNow() + } + } + break + } +} diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go index 528922d..7236050 100644 --- a/server/core/backend/store/indexer.go +++ b/server/core/backend/store/indexer.go @@ -264,11 +264,6 @@ func (i *Indexer) deletePrefixKey(prefix, key string) { if !ok { return } - // remove child - for k := range i.prefixIndex[key] { - i.deletePrefixKey(key, k) - } - delete(m, key) // remove parent which has no child diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go index 9e210bc..189f909 100644 --- a/server/core/backend/store/listwatch.go +++ b/server/core/backend/store/listwatch.go @@ -32,7 +32,7 @@ const EVENT_BUS_MAX_SIZE = 1000 type Event struct { Revision int64 Type proto.EventType - Key string + Prefix string Object interface{} } @@ -93,7 +93,7 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []*Event)) error evts := make([]*Event, len(resp.Kvs)) for i, kv := range resp.Kvs { - evt := &Event{Key: lw.Key, Revision: kv.ModRevision} + evt := &Event{Prefix: lw.Key, Revision: kv.ModRevision} switch { case resp.Action == registry.Put && kv.Version == 1: evt.Type, evt.Object = proto.EVT_CREATE, kv @@ -148,9 +148,9 @@ func (w *Watcher) process() { } } -func (w *Watcher) sendEvent(evt []*Event) { +func (w *Watcher) sendEvent(evts []*Event) { defer util.RecoverAndReport() - w.bus <- evt + w.bus <- evts } func (w *Watcher) Stop() { @@ -168,7 +168,7 @@ func (w *Watcher) Stop() { func errEvent(key string, err error) *Event { return &Event{ Type: proto.EVT_ERROR, - Key: key, + Prefix: key, Object: err, } } diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/opt.go index 252f244..86142b1 100644 --- a/server/core/backend/store/opt.go +++ b/server/core/backend/store/opt.go @@ -34,7 +34,7 @@ type KvCacherCfg struct { Timeout time.Duration Period time.Duration OnEvent KvEventFunc - DeferHander DeferHandler + DeferHandler DeferHandler } func (cfg KvCacherCfg) String() string { @@ -65,7 +65,7 @@ func WithEventFunc(f KvEventFunc) KvCacherCfgOption { } func WithDeferHandler(h DeferHandler) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.DeferHander = h } + return func(cfg *KvCacherCfg) { cfg.DeferHandler = h } } func DefaultKvCacherConfig() KvCacherCfg { diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go index d2f89e7..6a5f38c 100644 --- a/server/core/backend/store/store.go +++ b/server/core/backend/store/store.go @@ -124,13 +124,6 @@ func (s *KvStore) Initialize() { func (s *KvStore) dispatchEvent(t StoreType, evt *KvEvent) { s.indexers[t].OnCacheEvent(evt) - select { - case <-s.Ready(): - default: - if evt.Action == pb.EVT_CREATE { - evt.Action = pb.EVT_INIT - } - } EventProxy(t).OnEvent(evt) } diff --git a/server/infra/quota/quota.go b/server/infra/quota/quota.go index 84dc579..acff48e 100644 --- a/server/infra/quota/quota.go +++ b/server/infra/quota/quota.go @@ -18,8 +18,8 @@ package quota import ( "fmt" - "golang.org/x/net/context" scerr "github.com/apache/incubator-servicecomb-service-center/server/error" + "golang.org/x/net/context" ) type ApplyQuotaResult struct { diff --git a/server/plugin/infra/registry/etcd/etcd.go b/server/plugin/infra/registry/etcd/etcd.go index d848eb9..1e57ee1 100644 --- a/server/plugin/infra/registry/etcd/etcd.go +++ b/server/plugin/infra/registry/etcd/etcd.go @@ -501,49 +501,55 @@ func (c *EtcdClient) Watch(ctx context.Context, opts ...registry.PluginOpOption) return case resp, ok = <-ws: if !ok { - err := errors.New("channel is closed") - return err + err = errors.New("channel is closed") + return } // cause a rpc ResourceExhausted error if watch response body larger then 4MB if err = resp.Err(); err != nil { - return err + return } - l := len(resp.Events) - kvs := make([]*mvccpb.KeyValue, l) - pIdx, prevAction := 0, mvccpb.PUT - pResp := ®istry.PluginResponse{Action: registry.Put, Succeeded: true} - - for _, evt := range resp.Events { - if prevAction != evt.Type { - prevAction = evt.Type - - if pIdx > 0 { - err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback) - if err != nil { - return - } - pIdx = 0 - } - } + err = dispatch(resp.Events, op.WatchCallback) + if err != nil { + return + } + } + } + } + return fmt.Errorf("no key has been watched") +} - pResp.Revision = evt.Kv.ModRevision - pResp.Action = setKvsAndConvertAction(kvs, pIdx, evt) +func dispatch(evts []*clientv3.Event, cb registry.WatchCallback) error { + l := len(evts) + kvs := make([]*mvccpb.KeyValue, l) + sIdx, eIdx, prevAction := 0, 0, mvccpb.PUT + pResp := ®istry.PluginResponse{Action: registry.Put, Succeeded: true} - pIdx++ - } + for _, evt := range evts { + if prevAction != evt.Type { + prevAction = evt.Type - if pIdx > 0 { - err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback) - if err != nil { - return - } + if eIdx > 0 { + err := setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb) + if err != nil { + return err } + sIdx = eIdx } } + + if pResp.Revision < evt.Kv.ModRevision { + pResp.Revision = evt.Kv.ModRevision + } + pResp.Action = setKvsAndConvertAction(kvs, eIdx, evt) + + eIdx++ } - err = fmt.Errorf("no key has been watched") - return + + if eIdx > 0 { + return setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb) + } + return nil } func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Event) registry.ActionType { @@ -564,12 +570,7 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Even func setResponseAndCallback(pResp *registry.PluginResponse, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error { pResp.Count = int64(len(kvs)) pResp.Kvs = kvs - - err := cb("key information changed", pResp) - if err != nil { - return err - } - return nil + return cb("key information changed", pResp) } func NewRegistry() mgr.PluginInstance { diff --git a/server/rest/controller/v4/microservice_controller.go b/server/rest/controller/v4/microservice_controller.go index b0b177f..795c72c 100644 --- a/server/rest/controller/v4/microservice_controller.go +++ b/server/rest/controller/v4/microservice_controller.go @@ -102,7 +102,6 @@ func (this *MicroServiceService) Unregister(w http.ResponseWriter, r *http.Reque func (this *MicroServiceService) GetServices(w http.ResponseWriter, r *http.Request) { request := &pb.GetServicesRequest{} - util.Logger().Debugf("domain is %s", util.ParseDomain(r.Context())) resp, _ := core.ServiceAPI.GetServices(r.Context(), request) respInternal := resp.Response resp.Response = nil diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go index 99808e1..d1bd311 100644 --- a/server/service/event/dependency_event_handler.go +++ b/server/service/event/dependency_event_handler.go @@ -91,9 +91,9 @@ func (h *DependencyEventHandler) loop() { } type DependencyEventHandlerResource struct { - dep *pb.ConsumerDependency - kv *mvccpb.KeyValue - domainProject string + dep *pb.ConsumerDependency + kv *mvccpb.KeyValue + domainProject string } func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *mvccpb.KeyValue, domainProject string) *DependencyEventHandlerResource { @@ -150,7 +150,7 @@ func (h *DependencyEventHandler) Handle() error { dependencyRuleHandleResults := make(chan error, len(resourcesMap)) for lockKey, resources := range resourcesMap { - go func(lockKey string, resources []*DependencyEventHandlerResource){ + go func(lockKey string, resources []*DependencyEventHandlerResource) { err := h.dependencyRuleHandle(ctx, lockKey, resources) dependencyRuleHandleResults <- err }(lockKey, resources) @@ -169,7 +169,7 @@ func (h *DependencyEventHandler) Handle() error { return lastErr } -func (h *DependencyEventHandler)dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error{ +func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error { lock, err := serviceUtil.DependencyLock(lockKey) if err != nil { util.Logger().Errorf(err, "create dependency rule locker failed") diff --git a/server/service/instances.go b/server/service/instances.go index 1f066fb..2e12fcf 100644 --- a/server/service/instances.go +++ b/server/service/instances.go @@ -589,9 +589,13 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest) }, nil } - instances := make([]*pb.MicroServiceInstance, 0) + var instances []*pb.MicroServiceInstance + cloneCtx := ctx + if s, ok := ctx.Value("noCache").(string); !ok || s != "1" { + cloneCtx = util.SetContext(util.CloneContext(ctx), "cacheOnly", "1") + } for _, serviceId := range ids { - resp, err := s.GetInstances(ctx, &pb.GetInstancesRequest{ + resp, err := s.GetInstances(cloneCtx, &pb.GetInstancesRequest{ ConsumerServiceId: in.ConsumerServiceId, ProviderServiceId: serviceId, Tags: in.Tags, diff --git a/server/service/instances_test.go b/server/service/instances_test.go index 355a615..dfba6d3 100644 --- a/server/service/instances_test.go +++ b/server/service/instances_test.go @@ -909,7 +909,7 @@ var _ = Describe("'Instance' service", func() { respFind, err = instanceResource.Find( util.SetTargetDomainProject( - util.SetDomainProject(context.Background(), "user", "user"), + util.SetDomainProject(util.CloneContext(getContext()), "user", "user"), "default", "default"), &pb.FindInstancesRequest{ ConsumerServiceId: serviceId6, diff --git a/server/service/microservices.go b/server/service/microservices.go index 319c108..dcaa4be 100644 --- a/server/service/microservices.go +++ b/server/service/microservices.go @@ -18,6 +18,7 @@ package service import ( "encoding/json" + "errors" "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/util" "github.com/apache/incubator-servicecomb-service-center/server/core" @@ -33,7 +34,6 @@ import ( "golang.org/x/net/context" "strconv" "time" - "errors" ) type MicroServiceService struct { @@ -130,6 +130,7 @@ func (s *MicroServiceService) CreateServicePri(ctx context.Context, in *pb.Creat index := apt.GenerateServiceIndexKey(serviceKey) indexBytes := util.StringToBytesWithNoCopy(index) aliasBytes := util.StringToBytesWithNoCopy(apt.GenerateServiceAliasKey(serviceKey)) + opts := []registry.PluginOp{ registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)), registry.OpPut(registry.WithKey(indexBytes), registry.WithStrValue(serviceId)), diff --git a/server/service/rule.go b/server/service/rule.go index 287dbc2..4060907 100644 --- a/server/service/rule.go +++ b/server/service/rule.go @@ -64,7 +64,6 @@ func (s *MicroServiceService) AddRule(ctx context.Context, in *pb.AddServiceRule return response, nil } - ruleType, _, err := serviceUtil.GetServiceRuleType(ctx, domainProject, in.ServiceId) util.Logger().Debugf("ruleType is %s", ruleType) if err != nil { diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go index 5bfb978..68c6bf7 100644 --- a/server/service/util/dependency.go +++ b/server/service/util/dependency.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/cache" + "github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync" "github.com/apache/incubator-servicecomb-service-center/pkg/util" apt "github.com/apache/incubator-servicecomb-service-center/server/core" "github.com/apache/incubator-servicecomb-service-center/server/core/backend" @@ -28,11 +29,10 @@ import ( pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" scerr "github.com/apache/incubator-servicecomb-service-center/server/error" "github.com/apache/incubator-servicecomb-service-center/server/infra/registry" + "github.com/apache/incubator-servicecomb-service-center/server/mux" "golang.org/x/net/context" "strings" "time" - "github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync" - "github.com/apache/incubator-servicecomb-service-center/server/mux" ) var consumerCache *cache.Cache @@ -1059,5 +1059,5 @@ func DependencyLock(lockKey string) (*etcdsync.DLock, error) { } func NewDependencyLockKey(domainProject, env string) string { - return util.StringJoin([]string{"","env-lock", domainProject, env}, "/") -} \ No newline at end of file + return util.StringJoin([]string{"", "env-lock", domainProject, env}, "/") +} -- To stop receiving notification emails like this one, please contact asifdxtr...@apache.org.