This is an automated email from the ASF dual-hosted git repository. asifdxtreme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new cac383e SCB-343 Batch delete micro-services and enable tracing will cause SC crash (#282) cac383e is described below commit cac383e50f07f8ba4a526b2651f99d7eb47433b6 Author: little-cui <sure_0...@qq.com> AuthorDate: Tue Feb 13 12:36:35 2018 +0800 SCB-343 Batch delete micro-services and enable tracing will cause SC crash (#282) SCB-343 Batch delete micro-services and enable tracing will cause SC crash --- pkg/chain/handler.go | 5 +- pkg/etcdsync/mutex.go | 44 ++-- pkg/grace/grace.go | 45 +++-- pkg/httplimiter/httpratelimiter.go | 221 --------------------- pkg/ratelimiter/ratelimiter.go | 118 ----------- pkg/util/concurrent_map.go | 120 +++++++++++ pkg/util/concurrent_map_test.go | 161 +++++++++++++++ pkg/util/context.go | 29 +-- pkg/util/goroutines_test.go | 18 +- pkg/util/log_test.go | 13 +- pkg/util/net.go | 31 ++- pkg/util/net_test.go | 11 + pkg/{validate => util}/reflect.go | 18 +- .../util/reflect_test.go | 45 ++++- pkg/util/sys.go | 9 + pkg/util/tree_test.go | 4 +- pkg/util/uniqueue_test.go | 6 - pkg/util/util.go | 9 + pkg/util/util_test.go | 7 + pkg/validate/validate.go | 2 +- server/bootstrap/bootstrap.go | 2 - server/interceptor/ratelimiter/limiter.go | 89 --------- server/interceptor/ratelimiter/limiter_test.go | 83 -------- server/server.go | 5 +- server/service/instances.go | 3 +- server/service/microservices.go | 12 +- server/service/util/instance_util.go | 12 -- server/service/util/instance_util_test.go | 12 -- server/service/util/rule_util.go | 1 - 29 files changed, 466 insertions(+), 669 deletions(-) diff --git a/pkg/chain/handler.go b/pkg/chain/handler.go index 46d7491..be48299 100644 --- a/pkg/chain/handler.go +++ b/pkg/chain/handler.go @@ -18,13 +18,12 @@ package chain import ( "github.com/apache/incubator-servicecomb-service-center/pkg/util" - "github.com/apache/incubator-servicecomb-service-center/pkg/validate" "reflect" ) const CAP_SIZE = 10 -var handlersMap map[string][]Handler = make(map[string][]Handler, CAP_SIZE) +var handlersMap = make(map[string][]Handler, CAP_SIZE) type Handler interface { Handle(i *Invocation) @@ -38,7 +37,7 @@ func RegisterHandler(catalog string, h Handler) { handlers = append(handlers, h) handlersMap[catalog] = handlers - t := validate.LoadStruct(reflect.ValueOf(h).Elem().Interface()) + t := util.LoadStruct(reflect.ValueOf(h).Elem().Interface()) util.Logger().Infof("register handler[%s] %s/%s", catalog, t.Type.PkgPath(), t.Type.Name()) } diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go index 3cb4bcf..f54ef6f 100644 --- a/pkg/etcdsync/mutex.go +++ b/pkg/etcdsync/mutex.go @@ -23,24 +23,22 @@ import ( "github.com/apache/incubator-servicecomb-service-center/server/infra/registry" "github.com/coreos/etcd/client" "golang.org/x/net/context" - "io" "os" "sync" "time" ) const ( - defaultTTL = 60 - defaultTry = 3 - ROOT_PATH = "/cse/etcdsync" + DEFAULT_LOCK_TTL = 60 + DEFAULT_RETRY_TIMES = 3 + ROOT_PATH = "/cse/etcdsync" ) type DLockFactory struct { - key string - ctx context.Context - ttl int64 - mutex *sync.Mutex - logger io.Writer + key string + ctx context.Context + ttl int64 + mutex *sync.Mutex } type DLock struct { @@ -49,31 +47,19 @@ type DLock struct { } var ( - globalMap map[string]*DLockFactory + globalMap = make(map[string]*DLockFactory) globalMux sync.Mutex IsDebug bool - hostname string - pid int + hostname string = util.HostName() + pid int = os.Getpid() ) -func init() { - globalMap = make(map[string]*DLockFactory) - IsDebug = false - - var err error - hostname, err = os.Hostname() - if err != nil { - hostname = "UNKNOWN" - } - pid = os.Getpid() -} - func NewLockFactory(key string, ttl int64) *DLockFactory { if len(key) == 0 { return nil } if ttl < 1 { - ttl = defaultTTL + ttl = DEFAULT_LOCK_TTL } return &DLockFactory{ @@ -92,7 +78,7 @@ func (m *DLockFactory) NewDLock(wait bool) (l *DLock, err error) { builder: m, id: fmt.Sprintf("%v-%v-%v", hostname, pid, time.Now().Format("20060102-15:04:05.999999999")), } - for try := 1; try <= defaultTry; try++ { + for try := 1; try <= DEFAULT_RETRY_TIMES; try++ { err = l.Lock(wait) if err == nil { return l, nil @@ -103,7 +89,7 @@ func (m *DLockFactory) NewDLock(wait bool) (l *DLock, err error) { break } - if try <= defaultTry { + if try <= DEFAULT_RETRY_TIMES { util.Logger().Warnf(err, "Try to lock key %s again, id=%s", m.key, l.id) } else { util.Logger().Errorf(err, "Lock key %s failed, id=%s", m.key, l.id) @@ -147,7 +133,7 @@ func (m *DLock) Lock(wait bool) error { util.Logger().Warnf(err, "Key %s is locked, waiting for other node releases it, id=%s", m.builder.key, m.id) - ctx, cancel := context.WithTimeout(m.builder.ctx, defaultTTL*time.Second) + ctx, cancel := context.WithTimeout(m.builder.ctx, DEFAULT_LOCK_TTL*time.Second) go func() { err := backend.Registry().Watch(ctx, registry.WithStrKey(m.builder.key), @@ -179,7 +165,7 @@ func (m *DLock) Unlock() (err error) { registry.DEL, registry.WithStrKey(m.builder.key)} - for i := 1; i <= defaultTry; i++ { + for i := 1; i <= DEFAULT_RETRY_TIMES; i++ { _, err = backend.Registry().Do(m.builder.ctx, opts...) if err == nil { if !IsDebug { diff --git a/pkg/grace/grace.go b/pkg/grace/grace.go index dc212f7..fef6caf 100644 --- a/pkg/grace/grace.go +++ b/pkg/grace/grace.go @@ -146,26 +146,14 @@ func fork() (err error) { for name, i := range filesOffsetMap { orderArgs[i] = name } - path := os.Args[0] - var args []string - if len(os.Args) > 1 { - for _, arg := range os.Args[1:] { - if arg == "-fork" { - break - } - args = append(args, arg) - } - } - args = append(args, "-fork") + + // add fork and file descriptions order flags + args := append(parseCommandLine(), "-fork") if len(filesOffsetMap) > 0 { args = append(args, fmt.Sprintf(`-filesorder=%s`, strings.Join(orderArgs, ","))) } - cmd := exec.Command(path, args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.ExtraFiles = files - err = cmd.Start() - if err != nil { + + if err = newCommand(args...); err != nil { util.Logger().Errorf(err, "fork a process failed, %v", args) return } @@ -173,6 +161,29 @@ func fork() (err error) { return } +func parseCommandLine() (args []string) { + if len(os.Args) <= 1 { + return + } + // ignore process path + for _, arg := range os.Args[1:] { + if arg == "-fork" { + // ignore fork flags + break + } + args = append(args, arg) + } + return +} + +func newCommand(args ...string) error { + cmd := exec.Command(os.Args[0], args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.ExtraFiles = files + return cmd.Start() +} + func IsFork() bool { return isFork } diff --git a/pkg/httplimiter/httpratelimiter.go b/pkg/httplimiter/httpratelimiter.go deleted file mode 100644 index 3110056..0000000 --- a/pkg/httplimiter/httpratelimiter.go +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package httplimiter - -import ( - "fmt" - "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter" - "net/http" - "strconv" - "strings" - "sync" - "time" -) - -type HTTPErrorMessage struct { - Message string - StatusCode int -} - -func (httpErrorMessage *HTTPErrorMessage) Error() string { - return fmt.Sprintf("%v: %v", httpErrorMessage.StatusCode, httpErrorMessage.Message) -} - -type HttpLimiter struct { - HttpMessage string - ContentType string - StatusCode int - RequestLimit int64 - TTL time.Duration - IPLookups []string - Methods []string - Headers map[string][]string - BasicAuthUsers []string - leakyBuckets map[string]*ratelimiter.LeakyBucket - sync.RWMutex -} - -func LimitBySegments(limiter *HttpLimiter, keys []string) *HTTPErrorMessage { - if limiter.LimitExceeded(strings.Join(keys, "|")) { - return &HTTPErrorMessage{Message: limiter.HttpMessage, StatusCode: limiter.StatusCode} - } - - return nil -} - -func LimitByRequest(httpLimiter *HttpLimiter, r *http.Request) *HTTPErrorMessage { - sliceKeys := BuildSegments(httpLimiter, r) - - for _, keys := range sliceKeys { - httpError := LimitBySegments(httpLimiter, keys) - if httpError != nil { - return httpError - } - } - - return nil -} - -func BuildSegments(httpLimiter *HttpLimiter, r *http.Request) [][]string { - remoteIP := getRemoteIP(httpLimiter.IPLookups, r) - urlPath := r.URL.Path - sliceKeys := make([][]string, 0) - - if remoteIP == "" { - return sliceKeys - } - - if httpLimiter.Methods != nil && httpLimiter.Headers != nil && httpLimiter.BasicAuthUsers != nil { - if checkExistence(httpLimiter.Methods, r.Method) { - for headerKey, headerValues := range httpLimiter.Headers { - if (headerValues == nil || len(headerValues) <= 0) && r.Header.Get(headerKey) != "" { - username, _, ok := r.BasicAuth() - if ok && checkExistence(httpLimiter.BasicAuthUsers, username) { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, r.Method, headerKey, username}) - } - - } else if len(headerValues) > 0 && r.Header.Get(headerKey) != "" { - for _, headerValue := range headerValues { - username, _, ok := r.BasicAuth() - if ok && checkExistence(httpLimiter.BasicAuthUsers, username) { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, r.Method, headerKey, headerValue, username}) - } - } - } - } - } - - } else if httpLimiter.Methods != nil && httpLimiter.Headers != nil { - if checkExistence(httpLimiter.Methods, r.Method) { - for headerKey, headerValues := range httpLimiter.Headers { - if (headerValues == nil || len(headerValues) <= 0) && r.Header.Get(headerKey) != "" { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, r.Method, headerKey}) - - } else if len(headerValues) > 0 && r.Header.Get(headerKey) != "" { - for _, headerValue := range headerValues { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, r.Method, headerKey, headerValue}) - } - } - } - } - - } else if httpLimiter.Methods != nil && httpLimiter.BasicAuthUsers != nil { - if checkExistence(httpLimiter.Methods, r.Method) { - username, _, ok := r.BasicAuth() - if ok && checkExistence(httpLimiter.BasicAuthUsers, username) { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, r.Method, username}) - } - } - - } else if httpLimiter.Methods != nil { - if checkExistence(httpLimiter.Methods, r.Method) { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, r.Method}) - } - - } else if httpLimiter.Headers != nil { - for headerKey, headerValues := range httpLimiter.Headers { - if (headerValues == nil || len(headerValues) <= 0) && r.Header.Get(headerKey) != "" { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, headerKey}) - - } else if len(headerValues) > 0 && r.Header.Get(headerKey) != "" { - for _, headerValue := range headerValues { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, headerKey, headerValue}) - } - } - } - - } else if httpLimiter.BasicAuthUsers != nil { - username, _, ok := r.BasicAuth() - if ok && checkExistence(httpLimiter.BasicAuthUsers, username) { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath, username}) - } - } else { - sliceKeys = append(sliceKeys, []string{remoteIP, urlPath}) - } - - return sliceKeys -} - -func SetResponseHeaders(limiter *HttpLimiter, w http.ResponseWriter) { - w.Header().Add("X-Rate-Limit-Limit", strconv.FormatInt(limiter.RequestLimit, 10)) - w.Header().Add("X-Rate-Limit-Duration", limiter.TTL.String()) -} - -func checkExistence(sliceString []string, needle string) bool { - for _, b := range sliceString { - if b == needle { - return true - } - } - return false -} - -func ipAddrFromRemoteAddr(s string) string { - idx := strings.LastIndex(s, ":") - if idx == -1 { - return s - } - return s[:idx] -} - -func getRemoteIP(ipLookups []string, r *http.Request) string { - realIP := r.Header.Get("X-Real-IP") - forwardedFor := r.Header.Get("X-Forwarded-For") - - for _, lookup := range ipLookups { - if lookup == "RemoteAddr" { - return ipAddrFromRemoteAddr(r.RemoteAddr) - } - if lookup == "X-Forwarded-For" && forwardedFor != "" { - parts := strings.Split(forwardedFor, ",") - for i, p := range parts { - parts[i] = strings.TrimSpace(p) - } - return parts[0] - } - if lookup == "X-Real-IP" && realIP != "" { - return realIP - } - } - - return "" -} - -func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter { - limiter := &HttpLimiter{RequestLimit: max, TTL: ttl} - limiter.ContentType = "text/plain; charset=utf-8" - limiter.HttpMessage = "You have reached maximum request limit." - limiter.StatusCode = http.StatusTooManyRequests - limiter.leakyBuckets = make(map[string]*ratelimiter.LeakyBucket) - limiter.IPLookups = []string{"RemoteAddr", "X-Forwarded-For", "X-Real-IP"} - - return limiter -} - -func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool { - rateLimiter.Lock() - if _, found := rateLimiter.leakyBuckets[key]; !found { - rateLimiter.leakyBuckets[key] = ratelimiter.NewLeakyBucket(rateLimiter.TTL, rateLimiter.RequestLimit, rateLimiter.RequestLimit) - } - _, isInLimits := rateLimiter.leakyBuckets[key].MaximumTakeDuration(1, 0) - rateLimiter.Unlock() - if isInLimits { - return false - } - return true -} diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go deleted file mode 100644 index a46bc39..0000000 --- a/pkg/ratelimiter/ratelimiter.go +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ratelimiter - -import ( - "sync" - "time" -) - -type LeakyBucket struct { - startTime time.Time - capacity int64 - quantum int64 - interval time.Duration - mutex sync.Mutex - available int64 - availableTicker int64 -} - -func NewLeakyBucket(fillInterval time.Duration, capacity, quantum int64) *LeakyBucket { - if fillInterval <= 0 { - panic("leaky bucket fill interval is not > 0") - } - if capacity <= 0 { - panic("leaky bucket capacity is not > 0") - } - if quantum <= 0 { - panic("leaky bucket quantum is not > 0") - } - return &LeakyBucket{ - startTime: time.Now(), - capacity: capacity, - quantum: quantum, - available: capacity, - interval: fillInterval, - } -} - -func (leakyBucket *LeakyBucket) Wait(count int64) { - if d := leakyBucket.Take(count); d > 0 { - time.Sleep(d) - } -} - -func (leakyBucket *LeakyBucket) MaximumWaitDuration(count int64, maxWait time.Duration) bool { - d, ok := leakyBucket.MaximumTakeDuration(count, maxWait) - if d > 0 { - time.Sleep(d) - } - return ok -} - -const sleepForever time.Duration = 0x7fffffffffffffff - -func (leakyBucket *LeakyBucket) Take(count int64) time.Duration { - d, _ := leakyBucket.take(time.Now(), count, sleepForever) - return d -} - -func (leakyBucket *LeakyBucket) MaximumTakeDuration(count int64, maxWait time.Duration) (time.Duration, bool) { - return leakyBucket.take(time.Now(), count, maxWait) -} - -func (leakyBucket *LeakyBucket) Rate() float64 { - return 1e9 * float64(leakyBucket.quantum) / float64(leakyBucket.interval) -} - -func (leakyBucket *LeakyBucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) { - if count <= 0 { - return 0, true - } - leakyBucket.mutex.Lock() - defer leakyBucket.mutex.Unlock() - - currentTick := leakyBucket.adjust(now) - avail := leakyBucket.available - count - if avail >= 0 { - leakyBucket.available = avail - return 0, true - } - endTick := currentTick + (-avail+leakyBucket.quantum-1)/leakyBucket.quantum - endTime := leakyBucket.startTime.Add(time.Duration(endTick) * leakyBucket.interval) - waitTime := endTime.Sub(now) - if waitTime > maxWait { - return 0, false - } - leakyBucket.available = avail - return waitTime, true -} - -func (leakyBucket *LeakyBucket) adjust(now time.Time) (currentTick int64) { - currentTick = int64(now.Sub(leakyBucket.startTime) / leakyBucket.interval) - - if leakyBucket.available >= leakyBucket.capacity { - return - } - leakyBucket.available += (currentTick - leakyBucket.availableTicker) * leakyBucket.quantum - if leakyBucket.available > leakyBucket.capacity { - leakyBucket.available = leakyBucket.capacity - } - leakyBucket.availableTicker = currentTick - return -} diff --git a/pkg/util/concurrent_map.go b/pkg/util/concurrent_map.go new file mode 100644 index 0000000..b7ec039 --- /dev/null +++ b/pkg/util/concurrent_map.go @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import "sync" + +type MapItem struct { + Key interface{} + Value interface{} +} + +type ConcurrentMap struct { + items map[interface{}]interface{} + size int + mux sync.RWMutex + once sync.Once +} + +func (cm *ConcurrentMap) resize() { + cm.items = make(map[interface{}]interface{}, cm.size) +} + +func (cm *ConcurrentMap) init() { + cm.once.Do(cm.resize) +} + +func (cm *ConcurrentMap) Put(key, val interface{}) (old interface{}) { + cm.init() + cm.mux.Lock() + old, cm.items[key] = cm.items[key], val + cm.mux.Unlock() + return +} + +func (cm *ConcurrentMap) PutIfAbsent(key, val interface{}) (old interface{}) { + var b bool + cm.init() + cm.mux.Lock() + old, b = cm.items[key] + if !b { + cm.items[key] = val + } + cm.mux.Unlock() + return +} + +func (cm *ConcurrentMap) Get(key interface{}) (val interface{}, b bool) { + cm.init() + cm.mux.RLock() + val, b = cm.items[key] + cm.mux.RUnlock() + return +} + +func (cm *ConcurrentMap) Remove(key interface{}) (old interface{}) { + var b bool + cm.init() + cm.mux.Lock() + old, b = cm.items[key] + if b { + delete(cm.items, key) + } + cm.mux.Unlock() + return +} + +func (cm *ConcurrentMap) Clear() { + cm.mux.Lock() + cm.resize() + cm.mux.Unlock() +} + +func (cm *ConcurrentMap) Size() (s int) { + return len(cm.items) +} + +func (cm *ConcurrentMap) ForEach(f func(item MapItem) (next bool)) { + cm.mux.RLock() + s := len(cm.items) + if s == 0 { + cm.mux.RUnlock() + return + } + // avoid dead lock in function 'f' + ch := make(chan MapItem, s) + for k, v := range cm.items { + ch <- MapItem{k, v} + } + cm.mux.RUnlock() + + for { + select { + case item := <-ch: + if b := f(item); b { + continue + } + default: + } + break + } + close(ch) +} + +func NewConcurrentMap(size int) ConcurrentMap { + return ConcurrentMap{size: size} +} diff --git a/pkg/util/concurrent_map_test.go b/pkg/util/concurrent_map_test.go new file mode 100644 index 0000000..90fa711 --- /dev/null +++ b/pkg/util/concurrent_map_test.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + "testing" +) + +func TestConcurrentMap(t *testing.T) { + cm := ConcurrentMap{} + s := cm.Size() + if s != 0 { + fail(t, "TestConcurrentMap Size failed.") + } + v, b := cm.Get("a") + if b || v != nil { + fail(t, "TestConcurrentMap Get a not exist item failed.") + } + v = cm.Put("a", "1") + if v != nil { + fail(t, "TestConcurrentMap Put a new item failed.") + } + v, b = cm.Get("a") + if !b || v.(string) != "1" { + fail(t, "TestConcurrentMap Get an exist item failed.") + } + v = cm.Put("a", "2") + if v.(string) != "1" { + fail(t, "TestConcurrentMap Put an item again failed.") + } + v = cm.PutIfAbsent("b", "1") + if v != nil { + fail(t, "TestConcurrentMap PutIfAbsent a not exist item failed.") + } + v = cm.PutIfAbsent("a", "3") + if v.(string) != "2" { + fail(t, "TestConcurrentMap PutIfAbsent an item failed.") + } + v, b = cm.Get("a") + if !b || v.(string) != "2" { + fail(t, "TestConcurrentMap Get an item after PutIfAbsent failed.") + } + v = cm.Remove("a") + if v.(string) != "2" { + fail(t, "TestConcurrentMap Remove an item failed.") + } + v, b = cm.Get("a") + if b || v != nil { + fail(t, "TestConcurrentMap Get an item after Remove failed.") + } + s = cm.Size() + if s != 1 { // only 'b' is left + fail(t, "TestConcurrentMap Size after Put failed.") + } + cm.Clear() + s = cm.Size() + if s != 0 { + fail(t, "TestConcurrentMap Size after Clear failed.") + } +} + +func TestConcurrentMap_ForEach(t *testing.T) { + l := 0 + cm := ConcurrentMap{} + cm.ForEach(func(item MapItem) bool { + l++ + return true + }) + if l != 0 { + fail(t, "TestConcurrentMap_ForEach failed.") + } + for i := 0; i < 1000; i++ { + cm.Put(i, i) + } + cm.ForEach(func(item MapItem) bool { + l++ + cm.Remove(item.Key) + return true + }) + if l != 1000 || cm.Size() != 0 { + fail(t, "TestConcurrentMap_ForEach does not empty failed.") + } +} + +func TestNewConcurrentMap(t *testing.T) { + cm := NewConcurrentMap(100) + if cm.size != 100 { + fail(t, "TestNewConcurrentMap failed.") + } +} + +func BenchmarkConcurrentMap_Get(b *testing.B) { + var v interface{} + cm := ConcurrentMap{} + cm.Put("a", "1") + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + v, _ = cm.Get("a") + } + }) + b.ReportAllocs() + // 20000000 88.7 ns/op 0 B/op 0 allocs/op +} + +func BenchmarkConcurrentMap_Put(b *testing.B) { + cm := &ConcurrentMap{} + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cm.Put("a", "1") + } + }) + b.ReportAllocs() + // 3000000 420 ns/op 32 B/op 2 allocs/op +} + +func BenchmarkConcurrentMap_PutAndGet(b *testing.B) { + var v interface{} + cm := &ConcurrentMap{} + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cm.Put("a", "1") + v, _ = cm.Get("a") + } + }) + b.ReportAllocs() + // 3000000 560 ns/op 32 B/op 2 allocs/op +} + +func BenchmarkConcurrentMap_ForEach(b *testing.B) { + cm := ConcurrentMap{} + for i := 0; i < 100; i++ { + cm.Put(i, i) + } + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cm.ForEach(func(item MapItem) bool { + return true + }) + } + }) + b.ReportAllocs() + // 500000 3148 ns/op 3296 B/op 2 allocs/op +} diff --git a/pkg/util/context.go b/pkg/util/context.go index 41b41e4..4bc9b3a 100644 --- a/pkg/util/context.go +++ b/pkg/util/context.go @@ -24,7 +24,7 @@ import ( type StringContext struct { parentCtx context.Context - kv map[string]interface{} + kv ConcurrentMap } func (c *StringContext) Deadline() (deadline time.Time, ok bool) { @@ -44,11 +44,12 @@ func (c *StringContext) Value(key interface{}) interface{} { if !ok { return c.parentCtx.Value(key) } - return c.kv[k] + v, _ := c.kv.Get(k) + return v } func (c *StringContext) SetKV(key string, val interface{}) { - c.kv[key] = val + c.kv.Put(key, val) } func NewStringContext(ctx context.Context) *StringContext { @@ -56,7 +57,7 @@ func NewStringContext(ctx context.Context) *StringContext { if !ok { strCtx = &StringContext{ parentCtx: ctx, - kv: make(map[string]interface{}, 10), + kv: NewConcurrentMap(10), } } return strCtx @@ -69,19 +70,23 @@ func SetContext(ctx context.Context, key string, val interface{}) context.Contex } func CloneContext(ctx context.Context) context.Context { - strCtx := &StringContext{ - parentCtx: ctx, - kv: make(map[string]interface{}, 10), - } - old, ok := ctx.(*StringContext) if !ok { - return strCtx + return &StringContext{ + parentCtx: ctx, + kv: NewConcurrentMap(10), + } } - for k, v := range old.kv { - strCtx.kv[k] = v + strCtx := &StringContext{ + parentCtx: ctx, + kv: NewConcurrentMap(old.kv.Size()), } + + old.kv.ForEach(func(item MapItem) bool { + strCtx.kv.Put(item.Key, item.Value) + return true + }) return strCtx } diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go index 592965c..cfc0919 100644 --- a/pkg/util/goroutines_test.go +++ b/pkg/util/goroutines_test.go @@ -33,15 +33,13 @@ func TestGoRoutine_Init(t *testing.T) { test.Init(stopCh1) c := test.StopCh() if c != stopCh1 { - fmt.Println("init GoRoutine failed.") - t.Fail() + fail(t, "init GoRoutine failed.") } test.Init(stopCh2) c = test.StopCh() if c == stopCh2 { - fmt.Println("init GoRoutine twice.") - t.Fail() + fail(t, "init GoRoutine twice.") } } @@ -53,8 +51,7 @@ func TestGoRoutine_Do(t *testing.T) { defer close(stopCh) select { case <-neverStopCh: - fmt.Println("neverStopCh should not be closed.") - t.Fail() + fail(t, "neverStopCh should not be closed.") case <-time.After(time.Second): } }) @@ -69,8 +66,7 @@ func TestGoRoutine_Do(t *testing.T) { select { case <-stopCh: case <-time.After(time.Second): - fmt.Println("time out to wait stopCh1 close.") - t.Fail() + fail(t, "time out to wait stopCh1 close.") } }) close(stopCh1) @@ -102,8 +98,7 @@ func TestGoRoutine_Wait(t *testing.T) { test.Wait() fmt.Println(resultArr) if len(resultArr) != MAX { - fmt.Println("fail to wait all goroutines finish.") - t.Fail() + fail(t, "fail to wait all goroutines finish.") } } @@ -114,8 +109,7 @@ func TestGoRoutine_Close(t *testing.T) { select { case <-stopCh: case <-time.After(time.Second): - fmt.Println("time out to wait stopCh close.") - t.Fail() + fail(t, "time out to wait stopCh close.") } }) test.Close(true) diff --git a/pkg/util/log_test.go b/pkg/util/log_test.go index 0fe8e7c..bd030d0 100644 --- a/pkg/util/log_test.go +++ b/pkg/util/log_test.go @@ -17,7 +17,6 @@ package util import ( - "fmt" "testing" ) @@ -25,25 +24,21 @@ func TestLogger(t *testing.T) { CustomLogger("Not Exist", "testDefaultLOGGER") l := Logger() if l != LOGGER { - fmt.Println("should equal to LOGGER") - t.FailNow() + fail(t, "should equal to LOGGER") } CustomLogger("TestLogger", "testFuncName") l = Logger() if l == LOGGER || l == nil { - fmt.Println("should create a new instance for 'TestLogger'") - t.FailNow() + fail(t, "should create a new instance for 'TestLogger'") } s := Logger() if l != s { - fmt.Println("should be the same logger") - t.FailNow() + fail(t, "should be the same logger") } CustomLogger("github.com/apache/incubator-servicecomb-service-center/pkg/util", "testPkgPath") l = Logger() if l == LOGGER || l == nil { - fmt.Println("should create a new instance for 'util'") - t.FailNow() + fail(t, "should create a new instance for 'util'") } // l.Infof("OK") } diff --git a/pkg/util/net.go b/pkg/util/net.go index 5791b4f..457f9b6 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -21,9 +21,15 @@ import ( "net" "net/http" "net/url" + "strconv" "strings" ) +type IpPort struct { + IP string + Port uint16 +} + func GetIPFromContext(ctx context.Context) string { v, ok := FromContext(ctx, "x-remote-ip").(string) if !ok { @@ -56,6 +62,15 @@ func ParseEndpoint(ep string) (string, error) { return u.Hostname(), nil } +func ParseIpPort(addr string) IpPort { + idx := strings.LastIndex(addr, ":") + if idx == -1 { + return IpPort{addr, 0} + } + p, _ := strconv.Atoi(addr[idx+1:]) + return IpPort{addr[:idx], uint16(p)} +} + func GetRealIP(r *http.Request) string { for _, h := range [2]string{"X-Forwarded-For", "X-Real-Ip"} { addresses := strings.Split(r.Header.Get(h), ",") @@ -75,22 +90,6 @@ func GetRealIP(r *http.Request) string { return "" } -func GetLocalIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "" - } - for _, address := range addrs { - // check the address type and if it is not a loopback the display it - if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To4() != nil { - return ipnet.IP.String() - } - } - } - return "" -} - func InetNtoIP(ipnr uint32) net.IP { return net.IPv4(byte(ipnr>>24), byte(ipnr>>16), byte(ipnr>>8), byte(ipnr)) } diff --git a/pkg/util/net_test.go b/pkg/util/net_test.go index b61b99d..c148406 100644 --- a/pkg/util/net_test.go +++ b/pkg/util/net_test.go @@ -56,3 +56,14 @@ func TestInetNtoa(t *testing.T) { fail(t, "InetNtoa(%d) error", n3) } } + +func TestParseIpPort(t *testing.T) { + ipPort := ParseIpPort("0.0.0.0") + if ipPort.IP != "0.0.0.0" || ipPort.Port != 0 { + fail(t, "ParseIpPort(0.0.0.0) error", n3) + } + ipPort = ParseIpPort("0.0.0.0:1") + if ipPort.IP != "0.0.0.0" || ipPort.Port != 1 { + fail(t, "ParseIpPort(0.0.0.0) error", n3) + } +} diff --git a/pkg/validate/reflect.go b/pkg/util/reflect.go similarity index 81% rename from pkg/validate/reflect.go rename to pkg/util/reflect.go index 18f69b6..a5770f0 100644 --- a/pkg/validate/reflect.go +++ b/pkg/util/reflect.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package validate +package util import ( "reflect" @@ -26,21 +26,21 @@ var reflector *Reflector func init() { reflector = &Reflector{ - types: make(map[*uintptr]*StructType, 100), + types: make(map[*uintptr]StructType, 100), } } type StructType struct { Type reflect.Type - Fields []*reflect.StructField + Fields []reflect.StructField } type Reflector struct { - types map[*uintptr]*StructType + types map[*uintptr]StructType mux sync.RWMutex } -func (r *Reflector) Load(obj interface{}) *StructType { +func (r *Reflector) Load(obj interface{}) StructType { r.mux.RLock() itab := *(**uintptr)(unsafe.Pointer(&obj)) t, ok := r.types[itab] @@ -55,16 +55,16 @@ func (r *Reflector) Load(obj interface{}) *StructType { r.mux.Unlock() return t } - t = &StructType{ + t = StructType{ Type: reflect.TypeOf(obj), } fl := t.Type.NumField() if fl > 0 { - t.Fields = make([]*reflect.StructField, fl) + t.Fields = make([]reflect.StructField, fl) for i := 0; i < fl; i++ { f := t.Type.Field(i) - t.Fields[i] = &f + t.Fields[i] = f } } r.types[itab] = t @@ -72,6 +72,6 @@ func (r *Reflector) Load(obj interface{}) *StructType { return t } -func LoadStruct(obj interface{}) *StructType { +func LoadStruct(obj interface{}) StructType { return reflector.Load(obj) } diff --git a/server/interceptor/ratelimiter/ratelimiter_suite_test.go b/pkg/util/reflect_test.go similarity index 52% rename from server/interceptor/ratelimiter/ratelimiter_suite_test.go rename to pkg/util/reflect_test.go index af2baf8..ae0dfa5 100644 --- a/server/interceptor/ratelimiter/ratelimiter_suite_test.go +++ b/pkg/util/reflect_test.go @@ -14,16 +14,47 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ratelimiter_test +package util import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - + "fmt" "testing" ) -func TestNet(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "HttpLimiter Suite") +type testStru struct { + f1 int + f2 string + f3 testField + f4 *testField +} + +type testField struct { +} + +func TestLoadStruct(t *testing.T) { + obj1 := testStru{} + v := LoadStruct(obj1) + if v.Type.String() != "util.testStru" { + fail(t, "TestLoadStruct failed, %s != 'testStru'", v.Type.String()) + } + if len(v.Fields) != 4 { + fail(t, "TestLoadStruct failed, wrong count of fields") + } + for _, f := range v.Fields { + fmt.Println(f.Name, f.Type.String()) + } + + obj2 := testStru{} + v = LoadStruct(obj2) +} + +func BenchmarkLoadStruct(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + obj := testStru{} + LoadStruct(obj) + } + }) + b.ReportAllocs() + // 20000000 86.9 ns/op 32 B/op 1 allocs/op } diff --git a/pkg/util/sys.go b/pkg/util/sys.go index 8a9a23d..3873f87 100644 --- a/pkg/util/sys.go +++ b/pkg/util/sys.go @@ -37,3 +37,12 @@ func PathExist(path string) bool { _, err := os.Stat(path) return err == nil || os.IsExist(err) } + +func HostName() (hostname string) { + var err error + hostname, err = os.Hostname() + if err != nil { + hostname = "UNKNOWN" + } + return +} diff --git a/pkg/util/tree_test.go b/pkg/util/tree_test.go index 64e2339..79cb2c8 100644 --- a/pkg/util/tree_test.go +++ b/pkg/util/tree_test.go @@ -1,7 +1,6 @@ package util import ( - "fmt" "reflect" "testing" ) @@ -31,7 +30,6 @@ func TestTree(t *testing.T) { testTree.InOrderTraversal(testTree.GetRoot(), handle) if !reflect.DeepEqual(slice, targetSlice) { - fmt.Printf(`TestTree failed`) - t.FailNow() + fail(t, `TestTree failed`) } } diff --git a/pkg/util/uniqueue_test.go b/pkg/util/uniqueue_test.go index c69319d..6dca00d 100644 --- a/pkg/util/uniqueue_test.go +++ b/pkg/util/uniqueue_test.go @@ -25,12 +25,6 @@ import ( "time" ) -func fail(t *testing.T, format string, args ...interface{}) { - fmt.Printf(format, args...) - fmt.Println() - t.FailNow() -} - func TestNewUniQueue(t *testing.T) { _, err := newUniQueue(0) if err == nil { diff --git a/pkg/util/util.go b/pkg/util/util.go index 554af47..d63191a 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -201,3 +201,12 @@ func FormatFuncName(f string) string { func FuncName(f interface{}) string { return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() } + +func SliceHave(arr []string, str string) bool { + for _, item := range arr { + if item == str { + return true + } + } + return false +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 16f342f..3a54c36 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -17,9 +17,16 @@ package util import ( + "fmt" "testing" ) +func fail(t *testing.T, format string, args ...interface{}) { + fmt.Printf(format, args...) + fmt.Println() + t.FailNow() +} + func TestBytesToInt32(t *testing.T) { bs := []byte{0, 0, 0, 1} i := BytesToInt32(bs) diff --git a/pkg/validate/validate.go b/pkg/validate/validate.go index 65fcc20..3a36f31 100644 --- a/pkg/validate/validate.go +++ b/pkg/validate/validate.go @@ -208,7 +208,7 @@ func (v *Validator) Validate(s interface{}) error { return errors.New("not support validate type") } - st := LoadStruct(s) + st := util.LoadStruct(s) for i, l := 0, sv.NumField(); i < l; i++ { field := sv.Field(i) fieldName := st.Fields[i].Name diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go index b92c45a..058d034 100644 --- a/server/bootstrap/bootstrap.go +++ b/server/bootstrap/bootstrap.go @@ -55,7 +55,6 @@ import ( "github.com/apache/incubator-servicecomb-service-center/server/interceptor" "github.com/apache/incubator-servicecomb-service-center/server/interceptor/access" "github.com/apache/incubator-servicecomb-service-center/server/interceptor/cors" - "github.com/apache/incubator-servicecomb-service-center/server/interceptor/ratelimiter" ) func init() { @@ -63,7 +62,6 @@ func init() { // intercept requests before routing. interceptor.RegisterInterceptFunc(access.Intercept) - interceptor.RegisterInterceptFunc(ratelimiter.Intercept) interceptor.RegisterInterceptFunc(cors.Intercept) // handle requests after routing. diff --git a/server/interceptor/ratelimiter/limiter.go b/server/interceptor/ratelimiter/limiter.go deleted file mode 100644 index 5845b78..0000000 --- a/server/interceptor/ratelimiter/limiter.go +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ratelimiter - -import ( - "errors" - "github.com/apache/incubator-servicecomb-service-center/pkg/httplimiter" - "github.com/apache/incubator-servicecomb-service-center/pkg/util" - "github.com/apache/incubator-servicecomb-service-center/server/core" - "net/http" - "strings" - "sync" - "time" -) - -type Limiter struct { - conns int64 - - httpLimiter *httplimiter.HttpLimiter -} - -var limiter *Limiter -var mux sync.Mutex - -func GetLimiter() *Limiter { - if limiter == nil { - mux.Lock() - if limiter == nil { - limiter = new(Limiter) - limiter.LoadConfig() - } - mux.Unlock() - } - - return limiter -} - -func (this *Limiter) LoadConfig() { - ttl := time.Second - switch core.ServerInfo.Config.LimitTTLUnit { - case "ms": - ttl = time.Millisecond - case "m": - ttl = time.Minute - case "h": - ttl = time.Hour - } - this.conns = core.ServerInfo.Config.LimitConnections - this.httpLimiter = httplimiter.NewHttpLimiter(this.conns, ttl) - iplookups := core.ServerInfo.Config.LimitIPLookup - this.httpLimiter.IPLookups = strings.Split(iplookups, ",") - - util.Logger().Warnf(nil, "Rate-limit Load config, ttl: %s, conns: %d, iplookups: %s", ttl, this.conns, iplookups) -} - -func (this *Limiter) Handle(w http.ResponseWriter, r *http.Request) error { - if this.conns <= 0 { - return nil - } - - httplimiter.SetResponseHeaders(this.httpLimiter, w) - httpError := httplimiter.LimitByRequest(this.httpLimiter, r) - if httpError != nil { - w.Header().Add("Content-Type", this.httpLimiter.ContentType) - w.WriteHeader(httpError.StatusCode) - w.Write(util.StringToBytesWithNoCopy(httpError.Message)) - util.Logger().Warnf(nil, "Reached maximum request limit for %s host and %s url", r.RemoteAddr, r.RequestURI) - return errors.New(httpError.Message) - } - return nil -} - -func Intercept(w http.ResponseWriter, r *http.Request) error { - return GetLimiter().Handle(w, r) -} diff --git a/server/interceptor/ratelimiter/limiter_test.go b/server/interceptor/ratelimiter/limiter_test.go deleted file mode 100644 index eb8a752..0000000 --- a/server/interceptor/ratelimiter/limiter_test.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ratelimiter - -import ( - "github.com/didip/tollbooth" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "net/http" - "net/http/httptest" - "time" -) - -var _ = Describe("HttpLimiter", func() { - var ( - limiter *Limiter - ) - - BeforeEach(func() { - limiter = new(Limiter) - limiter.LoadConfig() - }) - Describe("LoadConfig", func() { - Context("Normal", func() { - It("should be ok", func() { - Expect(limiter.conns).To(Equal(int64(0))) - res := []string{"RemoteAddr", "X-Forwarded-For", "X-Real-IP"} - for i, val := range limiter.httpLimiter.IPLookups { - Expect(val).To(Equal(res[i])) - } - }) - }) - }) - Describe("FuncHandler", func() { - var ts *httptest.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - err := limiter.Handle(w, r) - if err != nil { - return - } - w.WriteHeader(http.StatusOK) - w.Write([]byte("Testing...")) - })) - Context("Connections > 0", func() { - It("should not be router", func() { - limiter.conns = 1 - limiter.httpLimiter = tollbooth.NewLimiter(1, time.Second) - resp, err := http.Get(ts.URL) - Expect(err).To(BeNil()) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - - resp, err = http.Get(ts.URL) - Expect(err).To(BeNil()) - Expect(resp.StatusCode).ToNot(Equal(http.StatusOK)) - }) - }) - Context("Connections <= 0", func() { - It("should be router", func() { - limiter.conns = 0 - limiter.httpLimiter = tollbooth.NewLimiter(0, time.Second) - resp, err := http.Get(ts.URL) - Expect(err).To(BeNil()) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - resp, err = http.Get(ts.URL) - Expect(err).To(BeNil()) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - }) - }) - }) -}) diff --git a/server/server.go b/server/server.go index c71a98c..a546879 100644 --- a/server/server.go +++ b/server/server.go @@ -155,10 +155,9 @@ func (s *ServiceCenterServer) startApiServer() { restPort := beego.AppConfig.String("httpport") rpcIp := beego.AppConfig.DefaultString("rpcaddr", "") rpcPort := beego.AppConfig.DefaultString("rpcport", "") - cmpName := core.ServerInfo.Config.LoggerName - hostName := fmt.Sprintf("%s_%s", cmpName, strings.Replace(util.GetLocalIP(), ".", "_", -1)) - s.apiServer.HostName = hostName + s.apiServer.HostName = fmt.Sprintf("%s_%s", core.ServerInfo.Config.LoggerName, + strings.Replace(restIp, ".", "_", -1)) s.addEndpoint(REST, restIp, restPort) s.addEndpoint(RPC, rpcIp, rpcPort) s.apiServer.Start() diff --git a/server/service/instances.go b/server/service/instances.go index a452967..7008667 100644 --- a/server/service/instances.go +++ b/server/service/instances.go @@ -96,7 +96,8 @@ func (s *InstanceService) Register(ctx context.Context, in *pb.RegisterInstanceR }, err } if oldInstanceId != "" { - util.Logger().Infof("instance more exist.") + util.Logger().Infof("register instance successful, reuse service %s instance %s, operator %s", + instance.ServiceId, oldInstanceId, remoteIP) return &pb.RegisterInstanceResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "instance more exist."), InstanceId: oldInstanceId, diff --git a/server/service/microservices.go b/server/service/microservices.go index 8514333..0e3e444 100644 --- a/server/service/microservices.go +++ b/server/service/microservices.go @@ -409,10 +409,16 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De } util.Logger().Infof("Batch DeleteServices serviceId = %v , result = %d, ", request.ServiceIds, responseCode) - return &pb.DelServicesResponse{ - Response: pb.CreateResponse(responseCode, "Delete services successfully."), + + resp := &pb.DelServicesResponse{ Services: delServiceRspInfo, - }, nil + } + if responseCode != pb.Response_SUCCESS { + resp.Response = pb.CreateResponse(responseCode, "Delete services failed.") + } else { + resp.Response = pb.CreateResponse(responseCode, "Delete services successfully.") + } + return resp, nil } func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceRequest) (*pb.GetServiceResponse, error) { diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go index 117ce38..e40fdae 100644 --- a/server/service/util/instance_util.go +++ b/server/service/util/instance_util.go @@ -79,7 +79,6 @@ func GetAllInstancesOfOneService(ctx context.Context, domainProject string, serv instances := make([]*pb.MicroServiceInstance, 0, len(resp.Kvs)) for _, kvs := range resp.Kvs { - util.Logger().Debugf("start unmarshal service instance file: %s", util.BytesToStringWithNoCopy(kvs.Key)) instance := &pb.MicroServiceInstance{} err := json.Unmarshal(kvs.Value, instance) if err != nil { @@ -159,15 +158,6 @@ func ParseEndpointValue(value []byte) EndpointValue { return endpointValue } -func isContain(endpoints []string, endpoint string) bool { - for _, tmpEndpoint := range endpoints { - if tmpEndpoint == endpoint { - return true - } - } - return false -} - func DeleteServiceAllInstances(ctx context.Context, serviceId string) error { domainProject := util.ParseDomainProject(ctx) @@ -234,8 +224,6 @@ func QueryAllProvidersInstances(ctx context.Context, selfServiceId string) (resu util.Logger().Debugf("query provider service %s instances[%d] with revision %d.", providerId, len(kvs), rev) for _, kv := range kvs { - util.Logger().Debugf("start unmarshal service instance file with revision %d: %s", - rev, util.BytesToStringWithNoCopy(kv.Key)) instance := &pb.MicroServiceInstance{} err := json.Unmarshal(kv.Value, instance) if err != nil { diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go index 5bba56c..5da480c 100644 --- a/server/service/util/instance_util_test.go +++ b/server/service/util/instance_util_test.go @@ -97,18 +97,6 @@ func TestCheckEndPoints(t *testing.T) { fmt.Printf(`CheckEndPoints failed`) t.FailNow() } - - b := isContain([]string{"a"}, "a") - if !b { - fmt.Printf(`isContain contain failed`) - t.FailNow() - } - - b = isContain([]string{}, "a") - if b { - fmt.Printf(`isContain empty failed`) - t.FailNow() - } } func TestDeleteServiceAllInstances(t *testing.T) { diff --git a/server/service/util/rule_util.go b/server/service/util/rule_util.go index b8b0774..2ecabc0 100644 --- a/server/service/util/rule_util.go +++ b/server/service/util/rule_util.go @@ -79,7 +79,6 @@ func GetRulesUtil(ctx context.Context, domainProject string, serviceId string) ( rules := []*pb.ServiceRule{} for _, kvs := range resp.Kvs { - util.Logger().Debugf("start unmarshal service rule file: %s", util.BytesToStringWithNoCopy(kvs.Key)) rule := &pb.ServiceRule{} err := json.Unmarshal(kvs.Value, rule) if err != nil { -- To stop receiving notification emails like this one, please contact asifdxtr...@apache.org.