This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch pool
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b222469194acfd236eb5677618b488c9f51b2598
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Aug 4 11:44:15 2024 +0800

    Fix duplicated measure data in a single part
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   2 +
 banyand/liaison/grpc/node.go                       |  10 ++
 banyand/liaison/grpc/server.go                     |   1 -
 banyand/liaison/http/server.go                     |   2 -
 banyand/measure/datapoints.go                      |  18 +-
 banyand/measure/part.go                            |   9 +-
 banyand/observability/metrics_system.go            |  16 +-
 banyand/observability/service.go                   |   7 +
 banyand/observability/system.go                    | 162 ------------------
 banyand/queue/sub/server.go                        |   1 -
 pkg/meter/native/collection.go                     |   2 +
 pkg/node/interface.go                              |  13 ++
 pkg/node/maglev.go                                 |  13 ++
 pkg/node/round_robin.go                            |  69 ++++++--
 pkg/node/round_robin_test.go                       |  22 +++
 pkg/test/measure/testdata/groups/exception.json    |  18 ++
 pkg/test/measure/testdata/measures/duplicated.json |  42 +++++
 test/cases/init.go                                 |   1 +
 test/cases/measure/data/data.go                    |   2 +
 test/cases/measure/data/input/duplicated_part.yaml |  25 +++
 test/cases/measure/data/testdata/duplicated.json   | 182 +++++++++++++++++++++
 test/cases/measure/data/want/duplicated_part.yaml  |  38 +++++
 test/cases/measure/measure.go                      |   1 +
 23 files changed, 469 insertions(+), 187 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 50884405..8355e291 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -34,8 +34,10 @@ Release Notes.
 - Fix a bug that the Stream module didn't support duplicated in index-based 
filtering and sorting
 - Fix the bug that segment's reference count is increased twice when the 
controller try to create an existing segment.
 - Fix a bug where a distributed query would return an empty result if the 
"limit" was set much lower than the "offset".
+- Fix duplicated measure data in a single part.
 
 ### Documentation
+
 - Introduce new doc menu structure.
 - Add installation on Docker and Kubernetes.
 - Add quick-start guide.
diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go
index b43a3bdb..9f307fc2 100644
--- a/banyand/liaison/grpc/node.go
+++ b/banyand/liaison/grpc/node.go
@@ -18,6 +18,7 @@
 package grpc
 
 import (
+       "fmt"
        "sync"
 
        "github.com/pkg/errors"
@@ -37,6 +38,7 @@ var (
 // together with the shardID calculated from the incoming data.
 type NodeRegistry interface {
        Locate(group, name string, shardID uint32) (string, error)
+       fmt.Stringer
 }
 
 type clusterNodeService struct {
@@ -94,8 +96,16 @@ func (n *clusterNodeService) OnDelete(metadata 
schema.Metadata) {
        }
 }
 
+func (n *clusterNodeService) String() string {
+       return n.sel.String()
+}
+
 type localNodeService struct{}
 
+func (l localNodeService) String() string {
+       return "local"
+}
+
 // NewLocalNodeRegistry creates a local(fake) node registry.
 func NewLocalNodeRegistry() NodeRegistry {
        return localNodeService{}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index a4d48bb5..2cc0dd73 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -191,7 +191,6 @@ func (s *server) Validate() error {
        if s.enableIngestionAccessLog && s.accessLogRootPath == "" {
                return errAccessLogRootPath
        }
-       observability.UpdateAddress("grpc", s.addr)
        if !s.tls {
                return nil
        }
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index f1b10053..f7b797b2 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -40,7 +40,6 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/ui"
@@ -102,7 +101,6 @@ func (p *server) Validate() error {
        if p.listenAddr == ":" {
                return errNoAddr
        }
-       observability.UpdateAddress("http", p.listenAddr)
        if p.grpcCert != "" {
                creds, errTLS := credentials.NewClientTLSFromFile(p.grpcCert, 
"")
                if errTLS != nil {
diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go
index 03b07093..156d4b45 100644
--- a/banyand/measure/datapoints.go
+++ b/banyand/measure/datapoints.go
@@ -123,6 +123,19 @@ type dataPoints struct {
        fields      []nameValues
 }
 
+func (d *dataPoints) skip(i int) {
+       if len(d.timestamps) <= i {
+               return
+       }
+       d.seriesIDs = append(d.seriesIDs[:i], d.seriesIDs[i+1:]...)
+       d.timestamps = append(d.timestamps[:i], d.timestamps[i+1:]...)
+       d.versions = append(d.versions[:i], d.versions[i+1:]...)
+       d.tagFamilies = append(d.tagFamilies[:i], d.tagFamilies[i+1:]...)
+       if len(d.fields) > 0 {
+               d.fields = append(d.fields[:i], d.fields[i+1:]...)
+       }
+}
+
 func (d *dataPoints) Len() int {
        return len(d.seriesIDs)
 }
@@ -131,7 +144,10 @@ func (d *dataPoints) Less(i, j int) bool {
        if d.seriesIDs[i] != d.seriesIDs[j] {
                return d.seriesIDs[i] < d.seriesIDs[j]
        }
-       return d.timestamps[i] < d.timestamps[j]
+       if d.timestamps[i] != d.timestamps[j] {
+               return d.timestamps[i] < d.timestamps[j]
+       }
+       return d.versions[i] > d.versions[j]
 }
 
 func (d *dataPoints) Swap(i, j int) {
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index c6383814..dd4c11ae 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -150,11 +150,18 @@ func (mp *memPart) mustInitFromDataPoints(dps 
*dataPoints) {
        var sidPrev common.SeriesID
        uncompressedBlockSizeBytes := uint64(0)
        var indexPrev int
-       for i := range dps.timestamps {
+       var prevTS int64
+       for i := 0; i < len(dps.timestamps); i++ {
                sid := dps.seriesIDs[i]
                if sidPrev == 0 {
                        sidPrev = sid
                }
+               if prevTS == dps.timestamps[i] {
+                       dps.skip(i)
+                       i--
+                       continue
+               }
+               prevTS = dps.timestamps[i]
 
                if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
                        (i-indexPrev) > maxBlockLength || sid != sidPrev {
diff --git a/banyand/observability/metrics_system.go 
b/banyand/observability/metrics_system.go
index ae391b73..7717db5d 100644
--- a/banyand/observability/metrics_system.go
+++ b/banyand/observability/metrics_system.go
@@ -55,8 +55,22 @@ var (
        upTimeGauge     meter.Gauge
        diskStateGauge  meter.Gauge
        initMetricsOnce sync.Once
+       diskMap         = sync.Map{}
 )
 
+// UpdatePath updates a path to monitoring its disk usage.
+func UpdatePath(path string) {
+       diskMap.Store(path, nil)
+}
+
+func getPath() (paths []string) {
+       diskMap.Range(func(key, _ any) bool {
+               paths = append(paths, key.(string))
+               return true
+       })
+       return paths
+}
+
 func init() {
        MetricsCollector.Register("cpu", collectCPU)
        MetricsCollector.Register("memory", collectMemory)
@@ -169,7 +183,7 @@ func collectUpTime() {
 }
 
 func collectDisk() {
-       for path := range getPath() {
+       for _, path := range getPath() {
                usage, err := disk.Usage(path)
                if err != nil {
                        if _, err = os.Stat(path); err != nil {
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index 8e41c5ed..68bd30e3 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -127,6 +127,7 @@ func (p *metricService) PreRun(ctx context.Context) error {
                NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata, 
nodeInfo)
        }
        initMetrics(p.modes)
+       metricsMux.HandleFunc("/_route", p.routeTableHandler)
        return nil
 }
 
@@ -180,6 +181,12 @@ func (p *metricService) GracefulStop() {
        p.closer.CloseThenWait()
 }
 
+func (p *metricService) routeTableHandler(w http.ResponseWriter, _ 
*http.Request) {
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       _, _ = w.Write([]byte(p.nodeSelector.String()))
+}
+
 func containsMode(modes []string, mode string) bool {
        for _, item := range modes {
                if item == mode {
diff --git a/banyand/observability/system.go b/banyand/observability/system.go
deleted file mode 100644
index 20aa68b6..00000000
--- a/banyand/observability/system.go
+++ /dev/null
@@ -1,162 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package observability
-
-import (
-       "encoding/json"
-       "net/http"
-       "os"
-       "sync"
-
-       "github.com/shirou/gopsutil/v3/cpu"
-       "github.com/shirou/gopsutil/v3/disk"
-       "github.com/shirou/gopsutil/v3/host"
-       "github.com/shirou/gopsutil/v3/mem"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-var systemInfoInstance *SystemInfo
-
-func init() {
-       systemInfoInstance = &SystemInfo{
-               Addresses:  make(map[string]string),
-               DiskUsages: make(map[string]*DiskUsage),
-       }
-}
-
-// UpdateAddress updates the address of the given name.
-func UpdateAddress(name, address string) {
-       systemInfoInstance.Lock()
-       defer systemInfoInstance.Unlock()
-       systemInfoInstance.Addresses[name] = address
-}
-
-func getAddresses() map[string]string {
-       systemInfoInstance.RLock()
-       defer systemInfoInstance.RUnlock()
-       return systemInfoInstance.Addresses
-}
-
-// UpdatePath updates a path to monitoring its disk usage.
-func UpdatePath(path string) {
-       systemInfoInstance.Lock()
-       defer systemInfoInstance.Unlock()
-       systemInfoInstance.DiskUsages[path] = nil
-}
-
-func getPath() map[string]*DiskUsage {
-       systemInfoInstance.RLock()
-       defer systemInfoInstance.RUnlock()
-       return systemInfoInstance.DiskUsages
-}
-
-// SystemInfo represents the system information of a node.
-type SystemInfo struct {
-       Addresses   map[string]string     `json:"addresses"`
-       DiskUsages  map[string]*DiskUsage `json:"disk_usages"`
-       NodeID      string                `json:"node_id"`
-       Hostname    string                `json:"hostname"`
-       Roles       []string              `json:"roles"`
-       Uptime      uint64                `json:"uptime"`
-       CPUUsage    float64               `json:"cpu_usage"`
-       MemoryUsage float64               `json:"memory_usage"`
-       sync.RWMutex
-}
-
-// DiskUsage represents the disk usage for a given path.
-type DiskUsage struct {
-       Capacity uint64 `json:"capacity"`
-       Used     uint64 `json:"used"`
-}
-
-// ErrorResponse represents the error response.
-type ErrorResponse struct {
-       Message       string `json:"message"`
-       OriginalError string `json:"original_error,omitempty"`
-}
-
-func init() {
-       metricsMux.HandleFunc("/system", systemInfoHandler)
-}
-
-func systemInfoHandler(w http.ResponseWriter, _ *http.Request) {
-       hostname, _ := os.Hostname()
-       uptime, _ := getUptime()
-       cpuUsage, _ := getCPUUsage()
-       memoryUsage, _ := getMemoryUsage()
-
-       systemInfo := &SystemInfo{
-               NodeID:      "1",
-               Roles:       []string{"meta", "ingest", "query", "data"},
-               Hostname:    hostname,
-               Uptime:      uptime,
-               CPUUsage:    cpuUsage,
-               MemoryUsage: memoryUsage,
-               Addresses:   getAddresses(),
-               DiskUsages:  make(map[string]*DiskUsage),
-       }
-       for k := range getPath() {
-               usage, _ := getDiskUsage(k)
-               systemInfo.DiskUsages[k] = &usage
-       }
-       w.Header().Set("Content-Type", "application/json")
-       w.WriteHeader(http.StatusOK)
-       if err := json.NewEncoder(w).Encode([]*SystemInfo{systemInfo}); err != 
nil {
-               w.WriteHeader(http.StatusInternalServerError)
-               errorResponse := &ErrorResponse{
-                       Message:       "Error encoding JSON response",
-                       OriginalError: err.Error(),
-               }
-               if err := json.NewEncoder(w).Encode(errorResponse); err != nil {
-                       logger.GetLogger().Error().Err(err).Msg("Error encoding 
JSON response")
-               }
-       }
-}
-
-func getUptime() (uint64, error) {
-       uptime, err := host.Uptime()
-       if err != nil {
-               return 0, err
-       }
-       return uptime, nil
-}
-
-func getCPUUsage() (float64, error) {
-       percentages, err := cpu.Percent(0, false)
-       if err != nil {
-               return 0, err
-       }
-       return percentages[0], nil
-}
-
-func getMemoryUsage() (float64, error) {
-       vm, err := mem.VirtualMemory()
-       if err != nil {
-               return 0, err
-       }
-       return vm.UsedPercent, nil
-}
-
-func getDiskUsage(path string) (DiskUsage, error) {
-       usage, err := disk.Usage(path)
-       if err != nil {
-               return DiskUsage{}, err
-       }
-       return DiskUsage{Capacity: usage.Total, Used: usage.Used}, nil
-}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 2b880f9f..829e460a 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -112,7 +112,6 @@ func (s *server) Validate() error {
        if s.addr == ":" {
                return errNoAddr
        }
-       observability.UpdateAddress("grpc", s.addr)
        if !s.tls {
                return nil
        }
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
index 9904e11a..209211fd 100644
--- a/pkg/meter/native/collection.go
+++ b/pkg/meter/native/collection.go
@@ -19,6 +19,7 @@
 package native
 
 import (
+       "fmt"
        "time"
 
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -34,6 +35,7 @@ import (
 // NodeSelector has Locate method to select a nodeId.
 type NodeSelector interface {
        Locate(group, name string, shardID uint32) (string, error)
+       fmt.Stringer
 }
 
 type collector interface {
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 0f41aab5..c4341b7d 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -20,6 +20,7 @@ package node
 
 import (
        "context"
+       "fmt"
        "sync"
 
        "github.com/pkg/errors"
@@ -42,6 +43,7 @@ type Selector interface {
        RemoveNode(node *databasev1.Node)
        Pick(group, name string, shardID uint32) (string, error)
        run.PreRunner
+       fmt.Stringer
 }
 
 // NewPickFirstSelector returns a simple selector that always returns the 
first node if exists.
@@ -58,6 +60,17 @@ type pickFirstSelector struct {
        mu        sync.RWMutex
 }
 
+// String implements Selector.
+func (p *pickFirstSelector) String() string {
+       n, err := p.Pick("", "", 0)
+       if err != nil {
+               return fmt.Sprintf("%v", err)
+       }
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       return fmt.Sprintf("pick [%s] from %s", n, p.nodeIDs)
+}
+
 func (p *pickFirstSelector) PreRun(context.Context) error {
        return nil
 }
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index 34a855f6..3efcaab6 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -19,6 +19,7 @@ package node
 
 import (
        "context"
+       "fmt"
        "sort"
        "strconv"
        "sync"
@@ -38,6 +39,18 @@ type maglevSelector struct {
        mutex   sync.RWMutex
 }
 
+// String implements Selector.
+func (m *maglevSelector) String() string {
+       var groups []string
+       m.routers.Range(func(key, _ any) bool {
+               groups = append(groups, key.(string))
+               return true
+       })
+       m.mutex.RLock()
+       defer m.mutex.Unlock()
+       return fmt.Sprintf("nodes:%s groups:%s", m.nodes, groups)
+}
+
 func (m *maglevSelector) Name() string {
        return "maglev-selector"
 }
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
index 2815b19c..d0bb37e1 100644
--- a/pkg/node/round_robin.go
+++ b/pkg/node/round_robin.go
@@ -19,6 +19,7 @@ package node
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "slices"
        "sort"
@@ -32,22 +33,47 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
 type roundRobinSelector struct {
        schemaRegistry metadata.Repo
        closeCh        chan struct{}
-       lookupTable    sync.Map
+       lookupTable    []key
        nodes          []string
        mu             sync.RWMutex
 }
 
+func (r *roundRobinSelector) String() string {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+       result := make(map[string]string)
+       for _, entry := range r.lookupTable {
+               n, err := r.Pick(entry.group, "", entry.shardID)
+               key := fmt.Sprintf("%s-%d", entry.group, entry.shardID)
+               if err != nil {
+                       result[key] = fmt.Sprintf("%v", err)
+                       continue
+               }
+               result[key] = n
+       }
+       if len(result) < 1 {
+               return ""
+       }
+       jsonBytes, err := json.Marshal(result)
+       if err != nil {
+               return fmt.Sprintf("%v", err)
+       }
+       return convert.BytesToString(jsonBytes)
+}
+
 // NewRoundRobinSelector creates a new round-robin selector.
 func NewRoundRobinSelector(schemaRegistry metadata.Repo) Selector {
        rrs := &roundRobinSelector{
                nodes:          make([]string, 0),
                closeCh:        make(chan struct{}),
                schemaRegistry: schemaRegistry,
+               lookupTable:    make([]key, 0),
        }
        return rrs
 }
@@ -69,9 +95,11 @@ func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        if !ok || !validateGroup(group) {
                return
        }
+       r.mu.Lock()
+       defer r.mu.Unlock()
        for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
                k := key{group: group.Metadata.Name, shardID: i}
-               r.lookupTable.Store(k, 0)
+               r.lookupTable = append(r.lookupTable, k)
        }
        r.sortEntries()
 }
@@ -80,12 +108,18 @@ func (r *roundRobinSelector) OnDelete(schemaMetadata 
schema.Metadata) {
        if schemaMetadata.Kind != schema.KindGroup {
                return
        }
+       r.mu.Lock()
+       defer r.mu.Unlock()
        group := schemaMetadata.Spec.(*commonv1.Group)
        for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
                k := key{group: group.Metadata.Name, shardID: i}
-               r.lookupTable.Delete(k)
+               for j := range r.lookupTable {
+                       if r.lookupTable[j] == k {
+                               r.lookupTable = append(r.lookupTable[:j], 
r.lookupTable[j+1:]...)
+                               break
+                       }
+               }
        }
-       r.sortEntries()
 }
 
 func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) {
@@ -101,8 +135,10 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) 
(bool, []int64) {
        if err != nil {
                panic(fmt.Sprintf("failed to list group: %v", err))
        }
+       r.mu.Lock()
+       defer r.mu.Unlock()
        var revision int64
-       r.lookupTable = sync.Map{}
+       r.lookupTable = r.lookupTable[:0]
        for _, g := range gg {
                if !validateGroup(g) {
                        continue
@@ -112,7 +148,7 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) 
(bool, []int64) {
                }
                for i := uint32(0); i < g.ResourceOpts.ShardNum; i++ {
                        k := key{group: g.Metadata.Name, shardID: i}
-                       r.lookupTable.Store(k, 0)
+                       r.lookupTable = append(r.lookupTable, k)
                }
        }
        r.sortEntries()
@@ -144,29 +180,26 @@ func (r *roundRobinSelector) Pick(group, _ string, 
shardID uint32) (string, erro
        if len(r.nodes) == 0 {
                return "", errors.New("no nodes available")
        }
-       entry, ok := r.lookupTable.Load(k)
-       if ok {
-               return r.selectNode(entry), nil
+       i := sort.Search(len(r.lookupTable), func(i int) bool {
+               if r.lookupTable[i].group == group {
+                       return r.lookupTable[i].shardID >= shardID
+               }
+               return r.lookupTable[i].group > group
+       })
+       if i < len(r.lookupTable) && r.lookupTable[i] == k {
+               return r.selectNode(i), nil
        }
        return "", fmt.Errorf("%s-%d is a unknown shard", group, shardID)
 }
 
 func (r *roundRobinSelector) sortEntries() {
-       var keys []key
-       r.lookupTable.Range(func(k, _ any) bool {
-               keys = append(keys, k.(key))
-               return true
-       })
-       slices.SortFunc(keys, func(a, b key) int {
+       slices.SortFunc(r.lookupTable, func(a, b key) int {
                n := strings.Compare(a.group, b.group)
                if n != 0 {
                        return n
                }
                return int(a.shardID) - int(b.shardID)
        })
-       for i := range keys {
-               r.lookupTable.Store(keys[i], i)
-       }
 }
 
 func (r *roundRobinSelector) Close() {
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
index 9e8037c5..223e1908 100644
--- a/pkg/node/round_robin_test.go
+++ b/pkg/node/round_robin_test.go
@@ -111,6 +111,28 @@ func TestCleanupGroup(t *testing.T) {
        assert.Error(t, err)
 }
 
+func TestSortNodeEntries(t *testing.T) {
+       selector := &roundRobinSelector{
+               nodes: make([]string, 0),
+       }
+       setupGroup(selector)
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node3"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       assert.EqualValues(t, []string{"node1", "node2", "node3"}, 
selector.nodes)
+}
+
+func TestStringer(t *testing.T) {
+       selector := NewRoundRobinSelector(nil)
+       assert.Empty(t, selector.String())
+       setupGroup(selector)
+       assert.NotEmpty(t, selector.String())
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node3"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       assert.NotEmpty(t, selector.String())
+}
+
 var groupSchema = schema.Metadata{
        TypeMeta: schema.TypeMeta{
                Kind: schema.KindGroup,
diff --git a/pkg/test/measure/testdata/groups/exception.json 
b/pkg/test/measure/testdata/groups/exception.json
new file mode 100644
index 00000000..fadbd5a5
--- /dev/null
+++ b/pkg/test/measure/testdata/groups/exception.json
@@ -0,0 +1,18 @@
+{
+  "metadata": {
+    "name": "exception"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 7
+    }
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measures/duplicated.json 
b/pkg/test/measure/testdata/measures/duplicated.json
new file mode 100644
index 00000000..e25f0a37
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/duplicated.json
@@ -0,0 +1,42 @@
+{
+  "metadata": {
+    "name": "duplicated",
+    "group": "exception"
+  },
+  "tag_families": [
+    {
+      "name": "default",
+      "tags": [
+        {
+          "name": "id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "entity_id",
+          "type": "TAG_TYPE_STRING"
+        }
+      ]
+    }
+  ],
+  "fields": [
+    {
+      "name": "total",
+      "field_type": "FIELD_TYPE_INT",
+      "encoding_method": "ENCODING_METHOD_GORILLA",
+      "compression_method": "COMPRESSION_METHOD_ZSTD"
+    },
+    {
+      "name": "value",
+      "field_type": "FIELD_TYPE_INT",
+      "encoding_method": "ENCODING_METHOD_GORILLA",
+      "compression_method": "COMPRESSION_METHOD_ZSTD"
+    }
+  ],
+  "entity": {
+    "tag_names": [
+      "entity_id"
+    ]
+  },
+  "interval": "1m",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/init.go b/test/cases/init.go
index a39e09f6..7341a96d 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -54,4 +54,5 @@ func Initialize(addr string, now time.Time) {
        casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric", 
"service_latency_minute_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_latency_minute", 
"sw_metric", "service_instance_latency_minute_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_latency_minute", 
"sw_metric", "service_instance_latency_minute_data1.json", 
now.Add(1*time.Minute), interval)
+       casesmeasuredata.Write(conn, "duplicated", "exception", 
"duplicated.json", now, 0)
 }
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 3077c510..f50dc4e1 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -108,12 +108,14 @@ func loadData(md *commonv1.Metadata, measure 
measurev1.MeasureService_WriteClien
        content, err := dataFS.ReadFile("testdata/" + dataFile)
        gm.Expect(err).ShouldNot(gm.HaveOccurred())
        gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
+       nano := baseTime.UnixNano()
        for i, template := range templates {
                rawDataPointValue, errMarshal := json.Marshal(template)
                gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
                dataPointValue := &measurev1.DataPointValue{}
                gm.Expect(protojson.Unmarshal(rawDataPointValue, 
dataPointValue)).ShouldNot(gm.HaveOccurred())
                dataPointValue.Timestamp = 
timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval))
+               dataPointValue.Version = nano + int64(i)
                gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, 
DataPoint: dataPointValue, MessageId: uint64(time.Now().UnixNano())})).
                        Should(gm.Succeed())
        }
diff --git a/test/cases/measure/data/input/duplicated_part.yaml 
b/test/cases/measure/data/input/duplicated_part.yaml
new file mode 100644
index 00000000..91900820
--- /dev/null
+++ b/test/cases/measure/data/input/duplicated_part.yaml
@@ -0,0 +1,25 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "duplicated"
+groups: ["exception"]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["id", "entity_id"]
+fieldProjection:
+  names: ["total", "value"]
diff --git a/test/cases/measure/data/testdata/duplicated.json 
b/test/cases/measure/data/testdata/duplicated.json
new file mode 100644
index 00000000..d67feaaf
--- /dev/null
+++ b/test/cases/measure/data/testdata/duplicated.json
@@ -0,0 +1,182 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 1
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 2
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 3
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc2"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 5
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc2"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      },
+      {
+        "int": {
+          "value": 4
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc3"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      },
+      {
+        "int": {
+          "value": 6
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/want/duplicated_part.yaml 
b/test/cases/measure/data/want/duplicated_part.yaml
new file mode 100644
index 00000000..10a57e59
--- /dev/null
+++ b/test/cases/measure/data/want/duplicated_part.yaml
@@ -0,0 +1,38 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: total
+    value:
+      int:
+        value: "300"
+  - name: value
+    value:
+      int:
+        value: "6"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc3
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 960401ca..06b24498 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -70,4 +70,5 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )

Reply via email to