This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new a4b6ce4  SCB-296 Self preservation will never stop when SC do 
list-watch loop (#261)
a4b6ce4 is described below

commit a4b6ce4d60309361940b9039b9a16ce641af183a
Author: little-cui <sure_0...@qq.com>
AuthorDate: Mon Jan 29 12:39:18 2018 +0800

    SCB-296 Self preservation will never stop when SC do list-watch loop (#261)
    
    * SCB-296 Self preservation will never stop when SC do list-watch loop
---
 etc/conf/app.conf                                  |   5 +-
 integration/instances_test.go                      |   2 +-
 pkg/chain/chain.go                                 |   2 +-
 pkg/httplimiter/httpratelimiter.go                 |  13 +-
 pkg/logrotate/logrotate.go                         |   4 +-
 pkg/ratelimiter/ratelimiter.go                     |   5 +-
 pkg/rest/client.go                                 |   2 +-
 pkg/rest/route.go                                  |  16 +-
 server/core/backend/store/cacher.go                | 104 ++++++++-----
 server/core/backend/store/defer.go                 | 173 +++++++++++----------
 server/core/backend/store/defer_test.go            | 141 +++++++++++++++++
 server/core/backend/store/indexer.go               |   5 -
 server/core/backend/store/listwatch.go             |  10 +-
 server/core/backend/store/opt.go                   |   4 +-
 server/core/backend/store/store.go                 |   7 -
 server/infra/quota/quota.go                        |   2 +-
 server/plugin/infra/registry/etcd/etcd.go          |  75 ++++-----
 .../rest/controller/v4/microservice_controller.go  |   1 -
 server/service/event/dependency_event_handler.go   |  10 +-
 server/service/instances.go                        |   8 +-
 server/service/instances_test.go                   |   2 +-
 server/service/microservices.go                    |   3 +-
 server/service/rule.go                             |   1 -
 server/service/util/dependency.go                  |   8 +-
 24 files changed, 376 insertions(+), 227 deletions(-)

diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 9d54f2d..a48f08c 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -26,11 +26,12 @@ plugins_dir = ./plugins
 registry_plugin = etcd
 
 # registry address
-# registry_plugin equals to 'embeded_etcd', example:
+# 1. if registry_plugin equals to 'embeded_etcd'
 # manager_name = "sc-0"
 # manager_addr = "http://127.0.0.1:2380";
 # manager_cluster = "sc-0=http://127.0.0.1:2380";
-# registry_plugin equals to 'etcd'
+# 2. if registry_plugin equals to 'etcd'
+# manager_cluster = "127.0.0.1:2379"
 manager_cluster = "127.0.0.1:2379"
 
 #heartbeat that sync synchronizes client's endpoints with the known endpoints 
from the etcd membership,unit is second.
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 8ab0a78..948c9f6 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -248,7 +248,7 @@ var _ = Describe("MicroService Api Test", func() {
 
                By("Discover MicroService Instance API", func() {
                        It("Find Micro-service Info by AppID", func() {
-                               req, _ := http.NewRequest(GET, 
SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion,
 nil)
+                               req, _ := http.NewRequest(GET, 
SCURL+FINDINSTANCE+"?noCache=1&appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion,
 nil)
                                req.Header.Set("X-Domain-Name", "default")
                                req.Header.Set("X-ConsumerId", serviceId)
                                resp, _ := scclient.Do(req)
diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go
index 8517824..a48fc1c 100644
--- a/pkg/chain/chain.go
+++ b/pkg/chain/chain.go
@@ -60,7 +60,7 @@ func (c *Chain) syncNext(i *Invocation) {
 }
 
 func (c *Chain) Next(i *Invocation) {
-       go c.syncNext(i)
+       c.syncNext(i)
 }
 
 func NewChain(name string, handlers []Handler) Chain {
diff --git a/pkg/httplimiter/httpratelimiter.go 
b/pkg/httplimiter/httpratelimiter.go
index 9128d5b..3110056 100644
--- a/pkg/httplimiter/httpratelimiter.go
+++ b/pkg/httplimiter/httpratelimiter.go
@@ -18,13 +18,13 @@
 package httplimiter
 
 import (
+       "fmt"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
        "net/http"
        "strconv"
        "strings"
-       "time"
-       "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
-       "fmt"
        "sync"
+       "time"
 )
 
 type HTTPErrorMessage struct {
@@ -36,7 +36,6 @@ func (httpErrorMessage *HTTPErrorMessage) Error() string {
        return fmt.Sprintf("%v: %v", httpErrorMessage.StatusCode, 
httpErrorMessage.Message)
 }
 
-
 type HttpLimiter struct {
        HttpMessage    string
        ContentType    string
@@ -51,8 +50,6 @@ type HttpLimiter struct {
        sync.RWMutex
 }
 
-
-
 func LimitBySegments(limiter *HttpLimiter, keys []string) *HTTPErrorMessage {
        if limiter.LimitExceeded(strings.Join(keys, "|")) {
                return &HTTPErrorMessage{Message: limiter.HttpMessage, 
StatusCode: limiter.StatusCode}
@@ -199,7 +196,6 @@ func getRemoteIP(ipLookups []string, r *http.Request) 
string {
        return ""
 }
 
-
 func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
        limiter := &HttpLimiter{RequestLimit: max, TTL: ttl}
        limiter.ContentType = "text/plain; charset=utf-8"
@@ -211,7 +207,6 @@ func NewHttpLimiter(max int64, ttl time.Duration) 
*HttpLimiter {
        return limiter
 }
 
-
 func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
        rateLimiter.Lock()
        if _, found := rateLimiter.leakyBuckets[key]; !found {
@@ -224,5 +219,3 @@ func (rateLimiter *HttpLimiter) LimitExceeded(key string) 
bool {
        }
        return true
 }
-
-
diff --git a/pkg/logrotate/logrotate.go b/pkg/logrotate/logrotate.go
index c2e9e93..76de98d 100644
--- a/pkg/logrotate/logrotate.go
+++ b/pkg/logrotate/logrotate.go
@@ -240,9 +240,9 @@ func LogRotate(path string, MaxFileSize int, MaxBackupCount 
int) {
        }
 }
 
-func isSkip(f os.FileInfo) bool{
+func isSkip(f os.FileInfo) bool {
        //dir or non write permission,skip
-       return f.IsDir() || (f.Mode() & 0200 == 0000)
+       return f.IsDir() || (f.Mode()&0200 == 0000)
 }
 
 //path : where the file will be filtered
diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go
index 8184108..a46bc39 100644
--- a/pkg/ratelimiter/ratelimiter.go
+++ b/pkg/ratelimiter/ratelimiter.go
@@ -18,8 +18,8 @@
 package ratelimiter
 
 import (
-       "time"
        "sync"
+       "time"
 )
 
 type LeakyBucket struct {
@@ -32,7 +32,6 @@ type LeakyBucket struct {
        availableTicker int64
 }
 
-
 func NewLeakyBucket(fillInterval time.Duration, capacity, quantum int64) 
*LeakyBucket {
        if fillInterval <= 0 {
                panic("leaky bucket fill interval is not > 0")
@@ -77,7 +76,6 @@ func (leakyBucket *LeakyBucket) MaximumTakeDuration(count 
int64, maxWait time.Du
        return leakyBucket.take(time.Now(), count, maxWait)
 }
 
-
 func (leakyBucket *LeakyBucket) Rate() float64 {
        return 1e9 * float64(leakyBucket.quantum) / 
float64(leakyBucket.interval)
 }
@@ -118,4 +116,3 @@ func (leakyBucket *LeakyBucket) adjust(now time.Time) 
(currentTick int64) {
        leakyBucket.availableTicker = currentTick
        return
 }
-
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 3ce4e29..2cd68ab 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -28,9 +28,9 @@ import (
        "io/ioutil"
        "net"
        "net/http"
+       "net/url"
        "reflect"
        "time"
-       "net/url"
 )
 
 const (
diff --git a/pkg/rest/route.go b/pkg/rest/route.go
index 6595021..3015aff 100644
--- a/pkg/rest/route.go
+++ b/pkg/rest/route.go
@@ -23,6 +23,7 @@ import (
        errorsEx 
"github.com/apache/incubator-servicecomb-service-center/pkg/errors"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        "net/http"
+       "net/url"
        "strings"
 )
 
@@ -77,7 +78,7 @@ func (this *ROAServerHandler) ServeHTTP(w 
http.ResponseWriter, r *http.Request)
        for _, ph := range this.handlers[r.Method] {
                if params, ok := ph.try(r.URL.Path); ok {
                        if len(params) > 0 {
-                               r.URL.RawQuery = util.UrlEncode(params) + "&" + 
r.URL.RawQuery
+                               r.URL.RawQuery = params + r.URL.RawQuery
                        }
 
                        this.serve(ph, w, r)
@@ -151,7 +152,7 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler, 
w http.ResponseWriter
        <-ch
 }
 
-func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
+func (this *urlPatternHandler) try(path string) (p string, _ bool) {
        var i, j int
        l, sl := len(this.Path), len(path)
        for i < sl {
@@ -160,7 +161,7 @@ func (this *urlPatternHandler) try(path string) (p 
map[string]string, _ bool) {
                        if this.Path != "/" && l > 0 && this.Path[l-1] == '/' {
                                return p, true
                        }
-                       return nil, false
+                       return "", false
                case this.Path[j] == ':':
                        var val string
                        var nextc byte
@@ -168,19 +169,16 @@ func (this *urlPatternHandler) try(path string) (p 
map[string]string, _ bool) {
                        _, nextc, j = match(this.Path, isAlnum, 0, j+1)
                        val, _, i = match(path, matchParticial, nextc, i)
 
-                       if p == nil {
-                               p = make(map[string]string, 5)
-                       }
-                       p[this.Path[o:j]] = val
+                       p += url.QueryEscape(this.Path[o:j]) + "=" + 
url.QueryEscape(val) + "&"
                case path[i] == this.Path[j]:
                        i++
                        j++
                default:
-                       return nil, false
+                       return "", false
                }
        }
        if j != l {
-               return nil, false
+               return "", false
        }
        return p, true
 }
diff --git a/server/core/backend/store/cacher.go 
b/server/core/backend/store/cacher.go
index 7ca82cf..c6ade97 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -120,6 +120,15 @@ func (c *KvCache) Have(k interface{}) (ok bool) {
        return
 }
 
+func (c *KvCache) RLock() map[string]*mvccpb.KeyValue {
+       c.rwMux.RLock()
+       return c.store
+}
+
+func (c *KvCache) RUnlock() {
+       c.rwMux.RUnlock()
+}
+
 func (c *KvCache) Lock() map[string]*mvccpb.KeyValue {
        c.rwMux.Lock()
        return c.store
@@ -251,15 +260,39 @@ func (c *KvCacher) handleWatcher(watcher *Watcher) error {
 }
 
 func (c *KvCacher) needDeferHandle(evts []*Event) bool {
-       if c.Cfg.DeferHander == nil {
+       if c.Cfg.DeferHandler == nil {
                return false
        }
 
-       return c.Cfg.DeferHander.OnCondition(c.Cache(), evts)
+       return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
+}
+
+func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+       util.Logger().Debugf("start to list and watch %s", c.Cfg)
+       ctx, cancel := context.WithCancel(context.Background())
+       c.goroute.Do(func(stopCh <-chan struct{}) {
+               defer cancel()
+               <-stopCh
+       })
+       for {
+               start := time.Now()
+               c.ListAndWatch(ctx)
+               watchDuration := time.Since(start)
+               nextPeriod := 0 * time.Second
+               if watchDuration > 0 && c.Cfg.Period > watchDuration {
+                       nextPeriod = c.Cfg.Period - watchDuration
+               }
+               select {
+               case <-stopCh:
+                       util.Logger().Debugf("stop to list and watch %s", c.Cfg)
+                       return
+               case <-time.After(nextPeriod):
+               }
+       }
 }
 
 func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
-       if c.Cfg.DeferHander == nil {
+       if c.Cfg.DeferHandler == nil {
                return
        }
 
@@ -268,7 +301,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
                select {
                case <-stopCh:
                        return
-               case evt, ok := <-c.Cfg.DeferHander.HandleChan():
+               case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
                        if !ok {
                                <-time.After(time.Second)
                                continue
@@ -281,7 +314,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
 
                        evts[i] = evt
                        i++
-               case <-time.After(time.Second):
+               case <-time.After(300 * time.Millisecond):
                        if i == 0 {
                                continue
                        }
@@ -305,9 +338,9 @@ func (c *KvCacher) sync(evts []*Event) {
 }
 
 func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
-       cache := c.Cache().(*KvCache)
-       store := cache.Lock()
-       defer cache.Unlock()
+       store := c.cache.RLock()
+       defer c.cache.RUnlock()
+
        oc, nc := len(store), len(items)
        tc := oc + nc
        if tc == 0 {
@@ -359,7 +392,7 @@ func (c *KvCacher) filterDelete(store 
map[string]*mvccpb.KeyValue, newStore map[
                block[i] = &Event{
                        Revision: rev,
                        Type:     proto.EVT_DELETE,
-                       Key:      c.Cfg.Key,
+                       Prefix:   c.Cfg.Key,
                        Object:   v,
                }
                i++
@@ -387,7 +420,7 @@ func (c *KvCacher) filterCreateOrUpdate(store 
map[string]*mvccpb.KeyValue, newSt
                        block[i] = &Event{
                                Revision: rev,
                                Type:     proto.EVT_CREATE,
-                               Key:      c.Cfg.Key,
+                               Prefix:   c.Cfg.Key,
                                Object:   v,
                        }
                        i++
@@ -407,7 +440,7 @@ func (c *KvCacher) filterCreateOrUpdate(store 
map[string]*mvccpb.KeyValue, newSt
                block[i] = &Event{
                        Revision: rev,
                        Type:     proto.EVT_UPDATE,
-                       Key:      c.Cfg.Key,
+                       Prefix:   c.Cfg.Key,
                        Object:   v,
                }
                i++
@@ -424,10 +457,9 @@ func (c *KvCacher) filterCreateOrUpdate(store 
map[string]*mvccpb.KeyValue, newSt
 }
 
 func (c *KvCacher) onEvents(evts []*Event) {
-       cache := c.Cache().(*KvCache)
-       idx := 0
+       idx, init := 0, !c.IsReady()
        kvEvts := make([]*KvEvent, len(evts))
-       store := cache.Lock()
+       store := c.cache.Lock()
        for _, evt := range evts {
                kv := evt.Object.(*mvccpb.KeyValue)
                key := util.BytesToStringWithNoCopy(kv.Key)
@@ -456,7 +488,6 @@ func (c *KvCacher) onEvents(evts []*Event) {
                                Action:   t,
                                KV:       kv,
                        }
-                       idx++
                case proto.EVT_DELETE:
                        if !ok {
                                util.Logger().Warnf(nil, "unexpected %s event! 
key %s does not exist", evt.Type, key)
@@ -470,10 +501,15 @@ func (c *KvCacher) onEvents(evts []*Event) {
                                Action:   evt.Type,
                                KV:       prevKv,
                        }
-                       idx++
                }
+
+               if init && kvEvts[idx].Action == proto.EVT_CREATE {
+                       kvEvts[idx].Action = proto.EVT_INIT
+               }
+
+               idx++
        }
-       cache.Unlock()
+       c.cache.Unlock()
 
        c.onKvEvents(kvEvts[:idx])
 }
@@ -488,30 +524,7 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
 }
 
 func (c *KvCacher) run() {
-       c.goroute.Do(func(stopCh <-chan struct{}) {
-               util.Logger().Debugf("start to list and watch %s", c.Cfg)
-               ctx, cancel := context.WithCancel(context.Background())
-               c.goroute.Do(func(stopCh <-chan struct{}) {
-                       defer cancel()
-                       <-stopCh
-               })
-               for {
-                       start := time.Now()
-                       c.ListAndWatch(ctx)
-                       watchDuration := time.Now().Sub(start)
-                       nextPeriod := 0 * time.Second
-                       if watchDuration > 0 && c.Cfg.Period > watchDuration {
-                               nextPeriod = c.Cfg.Period - watchDuration
-                       }
-                       select {
-                       case <-stopCh:
-                               util.Logger().Debugf("stop to list and watch 
%s", c.Cfg)
-                               return
-                       case <-time.After(nextPeriod):
-                       }
-               }
-       })
-
+       c.goroute.Do(c.refresh)
        c.goroute.Do(c.deferHandle)
 }
 
@@ -533,6 +546,15 @@ func (c *KvCacher) Ready() <-chan struct{} {
        return c.ready
 }
 
+func (c *KvCacher) IsReady() bool {
+       select {
+       case <-c.ready:
+               return true
+       default:
+               return false
+       }
+}
+
 func NewKvCache(c *KvCacher, size int) *KvCache {
        return &KvCache{
                owner:       c,
diff --git a/server/core/backend/store/defer.go 
b/server/core/backend/store/defer.go
index 7f7bbfa..173a44a 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -30,122 +30,127 @@ type DeferHandler interface {
        HandleChan() <-chan *Event
 }
 
+type deferItem struct {
+       ttl   *time.Timer
+       event *Event
+}
+
 type InstanceEventDeferHandler struct {
        Percent float64
-       enabled bool
-       events  map[string]*Event
-       ttls    map[string]int64
-       mux     sync.RWMutex
-       deferCh chan *Event
-}
 
-func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool {
-       return iedh.Percent > 0 && total > 0 &&
-               del > 1 &&
-               float64(del/total) >= iedh.Percent
+       cache     Cache
+       once      sync.Once
+       enabled   bool
+       items     map[string]*deferItem
+       pendingCh chan []*Event
+       deferCh   chan *Event
 }
 
-func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) 
bool {
-       if !iedh.deferMode(cache.Size(), len(evts)) {
+func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) 
bool {
+       if iedh.Percent <= 0 {
                return false
        }
 
-       for _, evt := range evts {
-               if evt.Type != pb.EVT_DELETE {
-                       return false
-               }
-       }
-       return true
-}
-
-func (iedh *InstanceEventDeferHandler) init() {
-       if iedh.deferCh == nil {
+       iedh.once.Do(func() {
+               iedh.cache = cache
+               iedh.items = make(map[string]*deferItem, event_block_size)
+               iedh.pendingCh = make(chan []*Event, event_block_size)
                iedh.deferCh = make(chan *Event, event_block_size)
-       }
-
-       if iedh.events == nil {
-               iedh.events = make(map[string]*Event, event_block_size)
-               iedh.ttls = make(map[string]int64, event_block_size)
                util.Go(iedh.check)
-       }
-}
-
-func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) 
bool {
-       iedh.mux.Lock()
-       if !iedh.enabled && iedh.needDefer(cache, evts) {
-               util.Logger().Warnf(nil, "self preservation is enabled, caught 
%d(>=%.0f%%) DELETE events",
-                       len(evts), iedh.Percent*100)
-               iedh.enabled = true
-       }
+       })
 
-       if !iedh.enabled {
-               iedh.mux.Unlock()
-               return false
-       }
-
-       iedh.init()
-
-       for _, evt := range evts {
-               kv, ok := evt.Object.(*mvccpb.KeyValue)
-               if !ok {
-                       continue
-               }
-               key := util.BytesToStringWithNoCopy(kv.Key)
-               switch evt.Type {
-               case pb.EVT_CREATE, pb.EVT_UPDATE:
-                       delete(iedh.events, key)
-                       delete(iedh.ttls, key)
+       iedh.pendingCh <- evts
+       return true
+}
 
+func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
+       kv := evt.Object.(*mvccpb.KeyValue)
+       key := util.BytesToStringWithNoCopy(kv.Key)
+       _, ok := iedh.items[key]
+       switch evt.Type {
+       case pb.EVT_CREATE, pb.EVT_UPDATE:
+               if ok {
                        util.Logger().Infof("recovered key %s events", key)
+                       // return nil // no need to publish event to 
subscribers?
+               }
+               iedh.recover(evt)
+       case pb.EVT_DELETE:
+               if ok {
+                       return nil
+               }
 
-                       iedh.deferCh <- evt
-               case pb.EVT_DELETE:
-                       var instance pb.MicroServiceInstance
-                       err := json.Unmarshal(kv.Value, &instance)
-                       if err != nil {
-                               util.Logger().Errorf(err, "unmarshal instance 
file failed, key is %s", key)
-                               continue
-                       }
-                       iedh.events[key] = evt
-                       iedh.ttls[key] = int64(instance.HealthCheck.Interval * 
(instance.HealthCheck.Times + 1))
+               var instance pb.MicroServiceInstance
+               err := json.Unmarshal(kv.Value, &instance)
+               if err != nil {
+                       util.Logger().Errorf(err, "unmarshal instance file 
failed, key is %s", key)
+                       return err
+               }
+               iedh.items[key] = &deferItem{
+                       ttl: time.NewTimer(
+                               
time.Duration(instance.HealthCheck.Interval*(instance.HealthCheck.Times+1)) * 
time.Second),
+                       event: evt,
                }
        }
-       iedh.mux.Unlock()
-       return true
+       return nil
+}
+
+func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
+       return iedh.deferCh
 }
 
 func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
        defer util.RecoverAndReport()
+       t, n := iedh.newTimer(), false
        for {
                select {
                case <-stopCh:
                        return
-               case <-time.After(time.Second):
-                       iedh.mux.Lock()
-                       for key, ttl := range iedh.ttls {
-                               ttl--
-                               if ttl > 0 {
-                                       iedh.ttls[key] = ttl
-                                       continue
-                               }
-
-                               evt := iedh.events[key]
-                               delete(iedh.events, key)
-                               delete(iedh.ttls, key)
+               case evts := <-iedh.pendingCh:
+                       for _, evt := range evts {
+                               iedh.recoverOrDefer(evt)
+                       }
 
-                               util.Logger().Warnf(nil, "defer handle timed 
out, removed key is %s", key)
+                       del := len(iedh.items)
+                       if del > 0 && !n {
+                               t.Stop()
+                               t, n = iedh.newTimer(), true
+                       }
 
-                               iedh.deferCh <- evt
+                       total := iedh.cache.Size()
+                       if del > 0 && total > 0 && float64(del) >= 
float64(total)*iedh.Percent {
+                               iedh.enabled = true
+                               util.Logger().Warnf(nil, "self preservation is 
enabled, caught %d/%d(>=%.0f%%) DELETE events",
+                                       del, total, iedh.Percent*100)
+                       }
+               case <-t.C:
+                       t, n = iedh.newTimer(), false
+
+                       for key, item := range iedh.items {
+                               if iedh.enabled {
+                                       select {
+                                       case <-item.ttl.C:
+                                       default:
+                                               continue
+                                       }
+                                       util.Logger().Warnf(nil, "defer handle 
timed out, removed key is %s", key)
+                               }
+                               iedh.recover(item.event)
                        }
-                       if iedh.enabled && len(iedh.ttls) == 0 {
+
+                       if iedh.enabled && len(iedh.items) == 0 {
                                iedh.enabled = false
                                util.Logger().Warnf(nil, "self preservation is 
stopped")
                        }
-                       iedh.mux.Unlock()
                }
        }
 }
 
-func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
-       return iedh.deferCh
+func (iedh *InstanceEventDeferHandler) newTimer() *time.Timer {
+       return time.NewTimer(2 * time.Second) // instance DELETE event will be 
delay.
+}
+
+func (iedh *InstanceEventDeferHandler) recover(evt *Event) {
+       key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
+       delete(iedh.items, key)
+       iedh.deferCh <- evt
 }
diff --git a/server/core/backend/store/defer_test.go 
b/server/core/backend/store/defer_test.go
new file mode 100644
index 0000000..2c9c453
--- /dev/null
+++ b/server/core/backend/store/defer_test.go
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "github.com/coreos/etcd/mvcc/mvccpb"
+       "testing"
+       "time"
+)
+
+func TestInstanceEventDeferHandler_OnCondition(t *testing.T) {
+       iedh := &InstanceEventDeferHandler{
+               Percent: 0,
+       }
+
+       if iedh.OnCondition(nil, nil) {
+               fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 0%% 
failed`)
+               t.FailNow()
+       }
+
+       iedh.Percent = 0.01
+       if !iedh.OnCondition(nil, nil) {
+               fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 1%% 
failed`)
+               t.FailNow()
+       }
+}
+
+func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
+       inst := &pb.MicroServiceInstance{
+               HealthCheck: &pb.HealthCheck{
+                       Interval: 4,
+                       Times:    0,
+               },
+       }
+       b, _ := json.Marshal(inst)
+       kv1 := &mvccpb.KeyValue{
+               Key:   util.StringToBytesWithNoCopy("/1"),
+               Value: b,
+       }
+       kv2 := &mvccpb.KeyValue{
+               Key:   util.StringToBytesWithNoCopy("/2"),
+               Value: b,
+       }
+       kv3 := &mvccpb.KeyValue{
+               Key:   util.StringToBytesWithNoCopy("/3"),
+               Value: b,
+       }
+
+       cache := NewKvCache(nil, 1)
+       cache.store["/1"] = kv1
+       cache.store["/2"] = kv2
+       cache.store["/3"] = kv3
+
+       evts1 := []*Event{
+               {
+                       Type:   pb.EVT_CREATE,
+                       Object: kv1,
+               },
+               {
+                       Type:   pb.EVT_UPDATE,
+                       Object: kv1,
+               },
+       }
+       evts2 := []*Event{
+               {
+                       Type:   pb.EVT_DELETE,
+                       Object: kv2,
+               },
+               {
+                       Type:   pb.EVT_DELETE,
+                       Object: kv3,
+               },
+       }
+       evts3 := []*Event{
+               {
+                       Type:   pb.EVT_CREATE,
+                       Object: kv2,
+               },
+       }
+
+       iedh := &InstanceEventDeferHandler{
+               Percent: 0.01,
+       }
+
+       iedh.OnCondition(cache, evts1)
+       iedh.OnCondition(cache, evts2)
+       iedh.OnCondition(cache, evts3)
+
+       getEvents(t, iedh)
+
+       iedh.Percent = 0.8
+       iedh.OnCondition(cache, evts1)
+       iedh.OnCondition(cache, evts2)
+       iedh.OnCondition(cache, evts3)
+
+       getEvents(t, iedh)
+}
+
+func getEvents(t *testing.T, iedh *InstanceEventDeferHandler) {
+       fmt.Println(time.Now())
+       c := time.After(3 * time.Second)
+       var evt3 *Event
+       for {
+               select {
+               case evt := <-iedh.HandleChan():
+                       fmt.Println(time.Now(), evt)
+                       if string(evt.Object.(*mvccpb.KeyValue).Key) == "/3" {
+                               evt3 = evt
+                               if iedh.Percent == 0.01 && evt.Type == 
pb.EVT_DELETE {
+                                       
fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 1%% failed`)
+                                       t.FailNow()
+                               }
+                       }
+                       continue
+               case <-c:
+                       if iedh.Percent == 0.8 && evt3 == nil {
+                               
fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 80%% failed`)
+                               t.FailNow()
+                       }
+               }
+               break
+       }
+}
diff --git a/server/core/backend/store/indexer.go 
b/server/core/backend/store/indexer.go
index 528922d..7236050 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -264,11 +264,6 @@ func (i *Indexer) deletePrefixKey(prefix, key string) {
        if !ok {
                return
        }
-       // remove child
-       for k := range i.prefixIndex[key] {
-               i.deletePrefixKey(key, k)
-       }
-
        delete(m, key)
 
        // remove parent which has no child
diff --git a/server/core/backend/store/listwatch.go 
b/server/core/backend/store/listwatch.go
index 9e210bc..189f909 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -32,7 +32,7 @@ const EVENT_BUS_MAX_SIZE = 1000
 type Event struct {
        Revision int64
        Type     proto.EventType
-       Key      string
+       Prefix   string
        Object   interface{}
 }
 
@@ -93,7 +93,7 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f 
func(evt []*Event)) error
 
                                evts := make([]*Event, len(resp.Kvs))
                                for i, kv := range resp.Kvs {
-                                       evt := &Event{Key: lw.Key, Revision: 
kv.ModRevision}
+                                       evt := &Event{Prefix: lw.Key, Revision: 
kv.ModRevision}
                                        switch {
                                        case resp.Action == registry.Put && 
kv.Version == 1:
                                                evt.Type, evt.Object = 
proto.EVT_CREATE, kv
@@ -148,9 +148,9 @@ func (w *Watcher) process() {
        }
 }
 
-func (w *Watcher) sendEvent(evt []*Event) {
+func (w *Watcher) sendEvent(evts []*Event) {
        defer util.RecoverAndReport()
-       w.bus <- evt
+       w.bus <- evts
 }
 
 func (w *Watcher) Stop() {
@@ -168,7 +168,7 @@ func (w *Watcher) Stop() {
 func errEvent(key string, err error) *Event {
        return &Event{
                Type:   proto.EVT_ERROR,
-               Key:    key,
+               Prefix: key,
                Object: err,
        }
 }
diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/opt.go
index 252f244..86142b1 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/opt.go
@@ -34,7 +34,7 @@ type KvCacherCfg struct {
        Timeout            time.Duration
        Period             time.Duration
        OnEvent            KvEventFunc
-       DeferHander        DeferHandler
+       DeferHandler       DeferHandler
 }
 
 func (cfg KvCacherCfg) String() string {
@@ -65,7 +65,7 @@ func WithEventFunc(f KvEventFunc) KvCacherCfgOption {
 }
 
 func WithDeferHandler(h DeferHandler) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.DeferHander = h }
+       return func(cfg *KvCacherCfg) { cfg.DeferHandler = h }
 }
 
 func DefaultKvCacherConfig() KvCacherCfg {
diff --git a/server/core/backend/store/store.go 
b/server/core/backend/store/store.go
index d2f89e7..6a5f38c 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -124,13 +124,6 @@ func (s *KvStore) Initialize() {
 
 func (s *KvStore) dispatchEvent(t StoreType, evt *KvEvent) {
        s.indexers[t].OnCacheEvent(evt)
-       select {
-       case <-s.Ready():
-       default:
-               if evt.Action == pb.EVT_CREATE {
-                       evt.Action = pb.EVT_INIT
-               }
-       }
        EventProxy(t).OnEvent(evt)
 }
 
diff --git a/server/infra/quota/quota.go b/server/infra/quota/quota.go
index 84dc579..acff48e 100644
--- a/server/infra/quota/quota.go
+++ b/server/infra/quota/quota.go
@@ -18,8 +18,8 @@ package quota
 
 import (
        "fmt"
-       "golang.org/x/net/context"
        scerr 
"github.com/apache/incubator-servicecomb-service-center/server/error"
+       "golang.org/x/net/context"
 )
 
 type ApplyQuotaResult struct {
diff --git a/server/plugin/infra/registry/etcd/etcd.go 
b/server/plugin/infra/registry/etcd/etcd.go
index d848eb9..1e57ee1 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -501,49 +501,55 @@ func (c *EtcdClient) Watch(ctx context.Context, opts 
...registry.PluginOpOption)
                                return
                        case resp, ok = <-ws:
                                if !ok {
-                                       err := errors.New("channel is closed")
-                                       return err
+                                       err = errors.New("channel is closed")
+                                       return
                                }
                                // cause a rpc ResourceExhausted error if watch 
response body larger then 4MB
                                if err = resp.Err(); err != nil {
-                                       return err
+                                       return
                                }
 
-                               l := len(resp.Events)
-                               kvs := make([]*mvccpb.KeyValue, l)
-                               pIdx, prevAction := 0, mvccpb.PUT
-                               pResp := &registry.PluginResponse{Action: 
registry.Put, Succeeded: true}
-
-                               for _, evt := range resp.Events {
-                                       if prevAction != evt.Type {
-                                               prevAction = evt.Type
-
-                                               if pIdx > 0 {
-                                                       err = 
setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-                                                       if err != nil {
-                                                               return
-                                                       }
-                                                       pIdx = 0
-                                               }
-                                       }
+                               err = dispatch(resp.Events, op.WatchCallback)
+                               if err != nil {
+                                       return
+                               }
+                       }
+               }
+       }
+       return fmt.Errorf("no key has been watched")
+}
 
-                                       pResp.Revision = evt.Kv.ModRevision
-                                       pResp.Action = 
setKvsAndConvertAction(kvs, pIdx, evt)
+func dispatch(evts []*clientv3.Event, cb registry.WatchCallback) error {
+       l := len(evts)
+       kvs := make([]*mvccpb.KeyValue, l)
+       sIdx, eIdx, prevAction := 0, 0, mvccpb.PUT
+       pResp := &registry.PluginResponse{Action: registry.Put, Succeeded: true}
 
-                                       pIdx++
-                               }
+       for _, evt := range evts {
+               if prevAction != evt.Type {
+                       prevAction = evt.Type
 
-                               if pIdx > 0 {
-                                       err = setResponseAndCallback(pResp, 
kvs[:pIdx], op.WatchCallback)
-                                       if err != nil {
-                                               return
-                                       }
+                       if eIdx > 0 {
+                               err := setResponseAndCallback(pResp, 
kvs[sIdx:eIdx], cb)
+                               if err != nil {
+                                       return err
                                }
+                               sIdx = eIdx
                        }
                }
+
+               if pResp.Revision < evt.Kv.ModRevision {
+                       pResp.Revision = evt.Kv.ModRevision
+               }
+               pResp.Action = setKvsAndConvertAction(kvs, eIdx, evt)
+
+               eIdx++
        }
-       err = fmt.Errorf("no key has been watched")
-       return
+
+       if eIdx > 0 {
+               return setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+       }
+       return nil
 }
 
 func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt 
*clientv3.Event) registry.ActionType {
@@ -564,12 +570,7 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx 
int, evt *clientv3.Even
 func setResponseAndCallback(pResp *registry.PluginResponse, kvs 
[]*mvccpb.KeyValue, cb registry.WatchCallback) error {
        pResp.Count = int64(len(kvs))
        pResp.Kvs = kvs
-
-       err := cb("key information changed", pResp)
-       if err != nil {
-               return err
-       }
-       return nil
+       return cb("key information changed", pResp)
 }
 
 func NewRegistry() mgr.PluginInstance {
diff --git a/server/rest/controller/v4/microservice_controller.go 
b/server/rest/controller/v4/microservice_controller.go
index b0b177f..795c72c 100644
--- a/server/rest/controller/v4/microservice_controller.go
+++ b/server/rest/controller/v4/microservice_controller.go
@@ -102,7 +102,6 @@ func (this *MicroServiceService) Unregister(w 
http.ResponseWriter, r *http.Reque
 
 func (this *MicroServiceService) GetServices(w http.ResponseWriter, r 
*http.Request) {
        request := &pb.GetServicesRequest{}
-       util.Logger().Debugf("domain is %s", util.ParseDomain(r.Context()))
        resp, _ := core.ServiceAPI.GetServices(r.Context(), request)
        respInternal := resp.Response
        resp.Response = nil
diff --git a/server/service/event/dependency_event_handler.go 
b/server/service/event/dependency_event_handler.go
index 99808e1..d1bd311 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -91,9 +91,9 @@ func (h *DependencyEventHandler) loop() {
 }
 
 type DependencyEventHandlerResource struct {
-       dep    *pb.ConsumerDependency
-       kv     *mvccpb.KeyValue
-       domainProject  string
+       dep           *pb.ConsumerDependency
+       kv            *mvccpb.KeyValue
+       domainProject string
 }
 
 func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv 
*mvccpb.KeyValue, domainProject string) *DependencyEventHandlerResource {
@@ -150,7 +150,7 @@ func (h *DependencyEventHandler) Handle() error {
 
        dependencyRuleHandleResults := make(chan error, len(resourcesMap))
        for lockKey, resources := range resourcesMap {
-               go func(lockKey string, resources 
[]*DependencyEventHandlerResource){
+               go func(lockKey string, resources 
[]*DependencyEventHandlerResource) {
                        err := h.dependencyRuleHandle(ctx, lockKey, resources)
                        dependencyRuleHandleResults <- err
                }(lockKey, resources)
@@ -169,7 +169,7 @@ func (h *DependencyEventHandler) Handle() error {
        return lastErr
 }
 
-func (h *DependencyEventHandler)dependencyRuleHandle(ctx context.Context, 
lockKey string, resources []*DependencyEventHandlerResource) error{
+func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, 
lockKey string, resources []*DependencyEventHandlerResource) error {
        lock, err := serviceUtil.DependencyLock(lockKey)
        if err != nil {
                util.Logger().Errorf(err, "create dependency rule locker 
failed")
diff --git a/server/service/instances.go b/server/service/instances.go
index 1f066fb..2e12fcf 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -589,9 +589,13 @@ func (s *InstanceService) Find(ctx context.Context, in 
*pb.FindInstancesRequest)
                }, nil
        }
 
-       instances := make([]*pb.MicroServiceInstance, 0)
+       var instances []*pb.MicroServiceInstance
+       cloneCtx := ctx
+       if s, ok := ctx.Value("noCache").(string); !ok || s != "1" {
+               cloneCtx = util.SetContext(util.CloneContext(ctx), "cacheOnly", 
"1")
+       }
        for _, serviceId := range ids {
-               resp, err := s.GetInstances(ctx, &pb.GetInstancesRequest{
+               resp, err := s.GetInstances(cloneCtx, &pb.GetInstancesRequest{
                        ConsumerServiceId: in.ConsumerServiceId,
                        ProviderServiceId: serviceId,
                        Tags:              in.Tags,
diff --git a/server/service/instances_test.go b/server/service/instances_test.go
index 355a615..dfba6d3 100644
--- a/server/service/instances_test.go
+++ b/server/service/instances_test.go
@@ -909,7 +909,7 @@ var _ = Describe("'Instance' service", func() {
 
                                respFind, err = instanceResource.Find(
                                        util.SetTargetDomainProject(
-                                               
util.SetDomainProject(context.Background(), "user", "user"),
+                                               
util.SetDomainProject(util.CloneContext(getContext()), "user", "user"),
                                                "default", "default"),
                                        &pb.FindInstancesRequest{
                                                ConsumerServiceId: serviceId6,
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 319c108..dcaa4be 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -18,6 +18,7 @@ package service
 
 import (
        "encoding/json"
+       "errors"
        "fmt"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        "github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -33,7 +34,6 @@ import (
        "golang.org/x/net/context"
        "strconv"
        "time"
-       "errors"
 )
 
 type MicroServiceService struct {
@@ -130,6 +130,7 @@ func (s *MicroServiceService) CreateServicePri(ctx 
context.Context, in *pb.Creat
        index := apt.GenerateServiceIndexKey(serviceKey)
        indexBytes := util.StringToBytesWithNoCopy(index)
        aliasBytes := 
util.StringToBytesWithNoCopy(apt.GenerateServiceAliasKey(serviceKey))
+
        opts := []registry.PluginOp{
                registry.OpPut(registry.WithStrKey(key), 
registry.WithValue(data)),
                registry.OpPut(registry.WithKey(indexBytes), 
registry.WithStrValue(serviceId)),
diff --git a/server/service/rule.go b/server/service/rule.go
index 287dbc2..4060907 100644
--- a/server/service/rule.go
+++ b/server/service/rule.go
@@ -64,7 +64,6 @@ func (s *MicroServiceService) AddRule(ctx context.Context, in 
*pb.AddServiceRule
                return response, nil
        }
 
-
        ruleType, _, err := serviceUtil.GetServiceRuleType(ctx, domainProject, 
in.ServiceId)
        util.Logger().Debugf("ruleType is %s", ruleType)
        if err != nil {
diff --git a/server/service/util/dependency.go 
b/server/service/util/dependency.go
index 5bfb978..68c6bf7 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -21,6 +21,7 @@ import (
        "errors"
        "fmt"
        "github.com/apache/incubator-servicecomb-service-center/pkg/cache"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        apt "github.com/apache/incubator-servicecomb-service-center/server/core"
        
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -28,11 +29,10 @@ import (
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        scerr 
"github.com/apache/incubator-servicecomb-service-center/server/error"
        
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
+       "github.com/apache/incubator-servicecomb-service-center/server/mux"
        "golang.org/x/net/context"
        "strings"
        "time"
-       "github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
-       "github.com/apache/incubator-servicecomb-service-center/server/mux"
 )
 
 var consumerCache *cache.Cache
@@ -1059,5 +1059,5 @@ func DependencyLock(lockKey string) (*etcdsync.DLock, 
error) {
 }
 
 func NewDependencyLockKey(domainProject, env string) string {
-       return util.StringJoin([]string{"","env-lock", domainProject, env}, "/")
-}
\ No newline at end of file
+       return util.StringJoin([]string{"", "env-lock", domainProject, env}, 
"/")
+}

-- 
To stop receiving notification emails like this one, please contact
asifdxtr...@apache.org.

Reply via email to