This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch pool in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b222469194acfd236eb5677618b488c9f51b2598 Author: Gao Hongtao <[email protected]> AuthorDate: Sun Aug 4 11:44:15 2024 +0800 Fix duplicated measure data in a single part Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 2 + banyand/liaison/grpc/node.go | 10 ++ banyand/liaison/grpc/server.go | 1 - banyand/liaison/http/server.go | 2 - banyand/measure/datapoints.go | 18 +- banyand/measure/part.go | 9 +- banyand/observability/metrics_system.go | 16 +- banyand/observability/service.go | 7 + banyand/observability/system.go | 162 ------------------ banyand/queue/sub/server.go | 1 - pkg/meter/native/collection.go | 2 + pkg/node/interface.go | 13 ++ pkg/node/maglev.go | 13 ++ pkg/node/round_robin.go | 69 ++++++-- pkg/node/round_robin_test.go | 22 +++ pkg/test/measure/testdata/groups/exception.json | 18 ++ pkg/test/measure/testdata/measures/duplicated.json | 42 +++++ test/cases/init.go | 1 + test/cases/measure/data/data.go | 2 + test/cases/measure/data/input/duplicated_part.yaml | 25 +++ test/cases/measure/data/testdata/duplicated.json | 182 +++++++++++++++++++++ test/cases/measure/data/want/duplicated_part.yaml | 38 +++++ test/cases/measure/measure.go | 1 + 23 files changed, 469 insertions(+), 187 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 50884405..8355e291 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -34,8 +34,10 @@ Release Notes. - Fix a bug that the Stream module didn't support duplicated in index-based filtering and sorting - Fix the bug that segment's reference count is increased twice when the controller try to create an existing segment. - Fix a bug where a distributed query would return an empty result if the "limit" was set much lower than the "offset". +- Fix duplicated measure data in a single part. ### Documentation + - Introduce new doc menu structure. - Add installation on Docker and Kubernetes. - Add quick-start guide. diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go index b43a3bdb..9f307fc2 100644 --- a/banyand/liaison/grpc/node.go +++ b/banyand/liaison/grpc/node.go @@ -18,6 +18,7 @@ package grpc import ( + "fmt" "sync" "github.com/pkg/errors" @@ -37,6 +38,7 @@ var ( // together with the shardID calculated from the incoming data. type NodeRegistry interface { Locate(group, name string, shardID uint32) (string, error) + fmt.Stringer } type clusterNodeService struct { @@ -94,8 +96,16 @@ func (n *clusterNodeService) OnDelete(metadata schema.Metadata) { } } +func (n *clusterNodeService) String() string { + return n.sel.String() +} + type localNodeService struct{} +func (l localNodeService) String() string { + return "local" +} + // NewLocalNodeRegistry creates a local(fake) node registry. func NewLocalNodeRegistry() NodeRegistry { return localNodeService{} diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index a4d48bb5..2cc0dd73 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -191,7 +191,6 @@ func (s *server) Validate() error { if s.enableIngestionAccessLog && s.accessLogRootPath == "" { return errAccessLogRootPath } - observability.UpdateAddress("grpc", s.addr) if !s.tls { return nil } diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index f1b10053..f7b797b2 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -40,7 +40,6 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/ui" @@ -102,7 +101,6 @@ func (p *server) Validate() error { if p.listenAddr == ":" { return errNoAddr } - observability.UpdateAddress("http", p.listenAddr) if p.grpcCert != "" { creds, errTLS := credentials.NewClientTLSFromFile(p.grpcCert, "") if errTLS != nil { diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go index 03b07093..156d4b45 100644 --- a/banyand/measure/datapoints.go +++ b/banyand/measure/datapoints.go @@ -123,6 +123,19 @@ type dataPoints struct { fields []nameValues } +func (d *dataPoints) skip(i int) { + if len(d.timestamps) <= i { + return + } + d.seriesIDs = append(d.seriesIDs[:i], d.seriesIDs[i+1:]...) + d.timestamps = append(d.timestamps[:i], d.timestamps[i+1:]...) + d.versions = append(d.versions[:i], d.versions[i+1:]...) + d.tagFamilies = append(d.tagFamilies[:i], d.tagFamilies[i+1:]...) + if len(d.fields) > 0 { + d.fields = append(d.fields[:i], d.fields[i+1:]...) + } +} + func (d *dataPoints) Len() int { return len(d.seriesIDs) } @@ -131,7 +144,10 @@ func (d *dataPoints) Less(i, j int) bool { if d.seriesIDs[i] != d.seriesIDs[j] { return d.seriesIDs[i] < d.seriesIDs[j] } - return d.timestamps[i] < d.timestamps[j] + if d.timestamps[i] != d.timestamps[j] { + return d.timestamps[i] < d.timestamps[j] + } + return d.versions[i] > d.versions[j] } func (d *dataPoints) Swap(i, j int) { diff --git a/banyand/measure/part.go b/banyand/measure/part.go index c6383814..dd4c11ae 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -150,11 +150,18 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { var sidPrev common.SeriesID uncompressedBlockSizeBytes := uint64(0) var indexPrev int - for i := range dps.timestamps { + var prevTS int64 + for i := 0; i < len(dps.timestamps); i++ { sid := dps.seriesIDs[i] if sidPrev == 0 { sidPrev = sid } + if prevTS == dps.timestamps[i] { + dps.skip(i) + i-- + continue + } + prevTS = dps.timestamps[i] if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || (i-indexPrev) > maxBlockLength || sid != sidPrev { diff --git a/banyand/observability/metrics_system.go b/banyand/observability/metrics_system.go index ae391b73..7717db5d 100644 --- a/banyand/observability/metrics_system.go +++ b/banyand/observability/metrics_system.go @@ -55,8 +55,22 @@ var ( upTimeGauge meter.Gauge diskStateGauge meter.Gauge initMetricsOnce sync.Once + diskMap = sync.Map{} ) +// UpdatePath updates a path to monitoring its disk usage. +func UpdatePath(path string) { + diskMap.Store(path, nil) +} + +func getPath() (paths []string) { + diskMap.Range(func(key, _ any) bool { + paths = append(paths, key.(string)) + return true + }) + return paths +} + func init() { MetricsCollector.Register("cpu", collectCPU) MetricsCollector.Register("memory", collectMemory) @@ -169,7 +183,7 @@ func collectUpTime() { } func collectDisk() { - for path := range getPath() { + for _, path := range getPath() { usage, err := disk.Usage(path) if err != nil { if _, err = os.Stat(path); err != nil { diff --git a/banyand/observability/service.go b/banyand/observability/service.go index 8e41c5ed..68bd30e3 100644 --- a/banyand/observability/service.go +++ b/banyand/observability/service.go @@ -127,6 +127,7 @@ func (p *metricService) PreRun(ctx context.Context) error { NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata, nodeInfo) } initMetrics(p.modes) + metricsMux.HandleFunc("/_route", p.routeTableHandler) return nil } @@ -180,6 +181,12 @@ func (p *metricService) GracefulStop() { p.closer.CloseThenWait() } +func (p *metricService) routeTableHandler(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(p.nodeSelector.String())) +} + func containsMode(modes []string, mode string) bool { for _, item := range modes { if item == mode { diff --git a/banyand/observability/system.go b/banyand/observability/system.go deleted file mode 100644 index 20aa68b6..00000000 --- a/banyand/observability/system.go +++ /dev/null @@ -1,162 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package observability - -import ( - "encoding/json" - "net/http" - "os" - "sync" - - "github.com/shirou/gopsutil/v3/cpu" - "github.com/shirou/gopsutil/v3/disk" - "github.com/shirou/gopsutil/v3/host" - "github.com/shirou/gopsutil/v3/mem" - - "github.com/apache/skywalking-banyandb/pkg/logger" -) - -var systemInfoInstance *SystemInfo - -func init() { - systemInfoInstance = &SystemInfo{ - Addresses: make(map[string]string), - DiskUsages: make(map[string]*DiskUsage), - } -} - -// UpdateAddress updates the address of the given name. -func UpdateAddress(name, address string) { - systemInfoInstance.Lock() - defer systemInfoInstance.Unlock() - systemInfoInstance.Addresses[name] = address -} - -func getAddresses() map[string]string { - systemInfoInstance.RLock() - defer systemInfoInstance.RUnlock() - return systemInfoInstance.Addresses -} - -// UpdatePath updates a path to monitoring its disk usage. -func UpdatePath(path string) { - systemInfoInstance.Lock() - defer systemInfoInstance.Unlock() - systemInfoInstance.DiskUsages[path] = nil -} - -func getPath() map[string]*DiskUsage { - systemInfoInstance.RLock() - defer systemInfoInstance.RUnlock() - return systemInfoInstance.DiskUsages -} - -// SystemInfo represents the system information of a node. -type SystemInfo struct { - Addresses map[string]string `json:"addresses"` - DiskUsages map[string]*DiskUsage `json:"disk_usages"` - NodeID string `json:"node_id"` - Hostname string `json:"hostname"` - Roles []string `json:"roles"` - Uptime uint64 `json:"uptime"` - CPUUsage float64 `json:"cpu_usage"` - MemoryUsage float64 `json:"memory_usage"` - sync.RWMutex -} - -// DiskUsage represents the disk usage for a given path. -type DiskUsage struct { - Capacity uint64 `json:"capacity"` - Used uint64 `json:"used"` -} - -// ErrorResponse represents the error response. -type ErrorResponse struct { - Message string `json:"message"` - OriginalError string `json:"original_error,omitempty"` -} - -func init() { - metricsMux.HandleFunc("/system", systemInfoHandler) -} - -func systemInfoHandler(w http.ResponseWriter, _ *http.Request) { - hostname, _ := os.Hostname() - uptime, _ := getUptime() - cpuUsage, _ := getCPUUsage() - memoryUsage, _ := getMemoryUsage() - - systemInfo := &SystemInfo{ - NodeID: "1", - Roles: []string{"meta", "ingest", "query", "data"}, - Hostname: hostname, - Uptime: uptime, - CPUUsage: cpuUsage, - MemoryUsage: memoryUsage, - Addresses: getAddresses(), - DiskUsages: make(map[string]*DiskUsage), - } - for k := range getPath() { - usage, _ := getDiskUsage(k) - systemInfo.DiskUsages[k] = &usage - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - if err := json.NewEncoder(w).Encode([]*SystemInfo{systemInfo}); err != nil { - w.WriteHeader(http.StatusInternalServerError) - errorResponse := &ErrorResponse{ - Message: "Error encoding JSON response", - OriginalError: err.Error(), - } - if err := json.NewEncoder(w).Encode(errorResponse); err != nil { - logger.GetLogger().Error().Err(err).Msg("Error encoding JSON response") - } - } -} - -func getUptime() (uint64, error) { - uptime, err := host.Uptime() - if err != nil { - return 0, err - } - return uptime, nil -} - -func getCPUUsage() (float64, error) { - percentages, err := cpu.Percent(0, false) - if err != nil { - return 0, err - } - return percentages[0], nil -} - -func getMemoryUsage() (float64, error) { - vm, err := mem.VirtualMemory() - if err != nil { - return 0, err - } - return vm.UsedPercent, nil -} - -func getDiskUsage(path string) (DiskUsage, error) { - usage, err := disk.Usage(path) - if err != nil { - return DiskUsage{}, err - } - return DiskUsage{Capacity: usage.Total, Used: usage.Used}, nil -} diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go index 2b880f9f..829e460a 100644 --- a/banyand/queue/sub/server.go +++ b/banyand/queue/sub/server.go @@ -112,7 +112,6 @@ func (s *server) Validate() error { if s.addr == ":" { return errNoAddr } - observability.UpdateAddress("grpc", s.addr) if !s.tls { return nil } diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go index 9904e11a..209211fd 100644 --- a/pkg/meter/native/collection.go +++ b/pkg/meter/native/collection.go @@ -19,6 +19,7 @@ package native import ( + "fmt" "time" "google.golang.org/protobuf/types/known/timestamppb" @@ -34,6 +35,7 @@ import ( // NodeSelector has Locate method to select a nodeId. type NodeSelector interface { Locate(group, name string, shardID uint32) (string, error) + fmt.Stringer } type collector interface { diff --git a/pkg/node/interface.go b/pkg/node/interface.go index 0f41aab5..c4341b7d 100644 --- a/pkg/node/interface.go +++ b/pkg/node/interface.go @@ -20,6 +20,7 @@ package node import ( "context" + "fmt" "sync" "github.com/pkg/errors" @@ -42,6 +43,7 @@ type Selector interface { RemoveNode(node *databasev1.Node) Pick(group, name string, shardID uint32) (string, error) run.PreRunner + fmt.Stringer } // NewPickFirstSelector returns a simple selector that always returns the first node if exists. @@ -58,6 +60,17 @@ type pickFirstSelector struct { mu sync.RWMutex } +// String implements Selector. +func (p *pickFirstSelector) String() string { + n, err := p.Pick("", "", 0) + if err != nil { + return fmt.Sprintf("%v", err) + } + p.mu.Lock() + defer p.mu.Unlock() + return fmt.Sprintf("pick [%s] from %s", n, p.nodeIDs) +} + func (p *pickFirstSelector) PreRun(context.Context) error { return nil } diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go index 34a855f6..3efcaab6 100644 --- a/pkg/node/maglev.go +++ b/pkg/node/maglev.go @@ -19,6 +19,7 @@ package node import ( "context" + "fmt" "sort" "strconv" "sync" @@ -38,6 +39,18 @@ type maglevSelector struct { mutex sync.RWMutex } +// String implements Selector. +func (m *maglevSelector) String() string { + var groups []string + m.routers.Range(func(key, _ any) bool { + groups = append(groups, key.(string)) + return true + }) + m.mutex.RLock() + defer m.mutex.Unlock() + return fmt.Sprintf("nodes:%s groups:%s", m.nodes, groups) +} + func (m *maglevSelector) Name() string { return "maglev-selector" } diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go index 2815b19c..d0bb37e1 100644 --- a/pkg/node/round_robin.go +++ b/pkg/node/round_robin.go @@ -19,6 +19,7 @@ package node import ( "context" + "encoding/json" "fmt" "slices" "sort" @@ -32,22 +33,47 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/convert" ) type roundRobinSelector struct { schemaRegistry metadata.Repo closeCh chan struct{} - lookupTable sync.Map + lookupTable []key nodes []string mu sync.RWMutex } +func (r *roundRobinSelector) String() string { + r.mu.RLock() + defer r.mu.RUnlock() + result := make(map[string]string) + for _, entry := range r.lookupTable { + n, err := r.Pick(entry.group, "", entry.shardID) + key := fmt.Sprintf("%s-%d", entry.group, entry.shardID) + if err != nil { + result[key] = fmt.Sprintf("%v", err) + continue + } + result[key] = n + } + if len(result) < 1 { + return "" + } + jsonBytes, err := json.Marshal(result) + if err != nil { + return fmt.Sprintf("%v", err) + } + return convert.BytesToString(jsonBytes) +} + // NewRoundRobinSelector creates a new round-robin selector. func NewRoundRobinSelector(schemaRegistry metadata.Repo) Selector { rrs := &roundRobinSelector{ nodes: make([]string, 0), closeCh: make(chan struct{}), schemaRegistry: schemaRegistry, + lookupTable: make([]key, 0), } return rrs } @@ -69,9 +95,11 @@ func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata schema.Metadata) { if !ok || !validateGroup(group) { return } + r.mu.Lock() + defer r.mu.Unlock() for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ { k := key{group: group.Metadata.Name, shardID: i} - r.lookupTable.Store(k, 0) + r.lookupTable = append(r.lookupTable, k) } r.sortEntries() } @@ -80,12 +108,18 @@ func (r *roundRobinSelector) OnDelete(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindGroup { return } + r.mu.Lock() + defer r.mu.Unlock() group := schemaMetadata.Spec.(*commonv1.Group) for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ { k := key{group: group.Metadata.Name, shardID: i} - r.lookupTable.Delete(k) + for j := range r.lookupTable { + if r.lookupTable[j] == k { + r.lookupTable = append(r.lookupTable[:j], r.lookupTable[j+1:]...) + break + } + } } - r.sortEntries() } func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) { @@ -101,8 +135,10 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) { if err != nil { panic(fmt.Sprintf("failed to list group: %v", err)) } + r.mu.Lock() + defer r.mu.Unlock() var revision int64 - r.lookupTable = sync.Map{} + r.lookupTable = r.lookupTable[:0] for _, g := range gg { if !validateGroup(g) { continue @@ -112,7 +148,7 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) { } for i := uint32(0); i < g.ResourceOpts.ShardNum; i++ { k := key{group: g.Metadata.Name, shardID: i} - r.lookupTable.Store(k, 0) + r.lookupTable = append(r.lookupTable, k) } } r.sortEntries() @@ -144,29 +180,26 @@ func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string, erro if len(r.nodes) == 0 { return "", errors.New("no nodes available") } - entry, ok := r.lookupTable.Load(k) - if ok { - return r.selectNode(entry), nil + i := sort.Search(len(r.lookupTable), func(i int) bool { + if r.lookupTable[i].group == group { + return r.lookupTable[i].shardID >= shardID + } + return r.lookupTable[i].group > group + }) + if i < len(r.lookupTable) && r.lookupTable[i] == k { + return r.selectNode(i), nil } return "", fmt.Errorf("%s-%d is a unknown shard", group, shardID) } func (r *roundRobinSelector) sortEntries() { - var keys []key - r.lookupTable.Range(func(k, _ any) bool { - keys = append(keys, k.(key)) - return true - }) - slices.SortFunc(keys, func(a, b key) int { + slices.SortFunc(r.lookupTable, func(a, b key) int { n := strings.Compare(a.group, b.group) if n != 0 { return n } return int(a.shardID) - int(b.shardID) }) - for i := range keys { - r.lookupTable.Store(keys[i], i) - } } func (r *roundRobinSelector) Close() { diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go index 9e8037c5..223e1908 100644 --- a/pkg/node/round_robin_test.go +++ b/pkg/node/round_robin_test.go @@ -111,6 +111,28 @@ func TestCleanupGroup(t *testing.T) { assert.Error(t, err) } +func TestSortNodeEntries(t *testing.T) { + selector := &roundRobinSelector{ + nodes: make([]string, 0), + } + setupGroup(selector) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node3"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}}) + assert.EqualValues(t, []string{"node1", "node2", "node3"}, selector.nodes) +} + +func TestStringer(t *testing.T) { + selector := NewRoundRobinSelector(nil) + assert.Empty(t, selector.String()) + setupGroup(selector) + assert.NotEmpty(t, selector.String()) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node3"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}}) + assert.NotEmpty(t, selector.String()) +} + var groupSchema = schema.Metadata{ TypeMeta: schema.TypeMeta{ Kind: schema.KindGroup, diff --git a/pkg/test/measure/testdata/groups/exception.json b/pkg/test/measure/testdata/groups/exception.json new file mode 100644 index 00000000..fadbd5a5 --- /dev/null +++ b/pkg/test/measure/testdata/groups/exception.json @@ -0,0 +1,18 @@ +{ + "metadata": { + "name": "exception" + }, + "catalog": "CATALOG_MEASURE", + "resource_opts": { + "shard_num": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/measures/duplicated.json b/pkg/test/measure/testdata/measures/duplicated.json new file mode 100644 index 00000000..e25f0a37 --- /dev/null +++ b/pkg/test/measure/testdata/measures/duplicated.json @@ -0,0 +1,42 @@ +{ + "metadata": { + "name": "duplicated", + "group": "exception" + }, + "tag_families": [ + { + "name": "default", + "tags": [ + { + "name": "id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "entity_id", + "type": "TAG_TYPE_STRING" + } + ] + } + ], + "fields": [ + { + "name": "total", + "field_type": "FIELD_TYPE_INT", + "encoding_method": "ENCODING_METHOD_GORILLA", + "compression_method": "COMPRESSION_METHOD_ZSTD" + }, + { + "name": "value", + "field_type": "FIELD_TYPE_INT", + "encoding_method": "ENCODING_METHOD_GORILLA", + "compression_method": "COMPRESSION_METHOD_ZSTD" + } + ], + "entity": { + "tag_names": [ + "entity_id" + ] + }, + "interval": "1m", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/test/cases/init.go b/test/cases/init.go index a39e09f6..7341a96d 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -54,4 +54,5 @@ func Initialize(addr string, now time.Time) { casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric", "service_latency_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data1.json", now.Add(1*time.Minute), interval) + casesmeasuredata.Write(conn, "duplicated", "exception", "duplicated.json", now, 0) } diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index 3077c510..f50dc4e1 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -108,12 +108,14 @@ func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClien content, err := dataFS.ReadFile("testdata/" + dataFile) gm.Expect(err).ShouldNot(gm.HaveOccurred()) gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred()) + nano := baseTime.UnixNano() for i, template := range templates { rawDataPointValue, errMarshal := json.Marshal(template) gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred()) dataPointValue := &measurev1.DataPointValue{} gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred()) dataPointValue.Timestamp = timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval)) + dataPointValue.Version = nano + int64(i) gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue, MessageId: uint64(time.Now().UnixNano())})). Should(gm.Succeed()) } diff --git a/test/cases/measure/data/input/duplicated_part.yaml b/test/cases/measure/data/input/duplicated_part.yaml new file mode 100644 index 00000000..91900820 --- /dev/null +++ b/test/cases/measure/data/input/duplicated_part.yaml @@ -0,0 +1,25 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "duplicated" +groups: ["exception"] +tagProjection: + tagFamilies: + - name: "default" + tags: ["id", "entity_id"] +fieldProjection: + names: ["total", "value"] diff --git a/test/cases/measure/data/testdata/duplicated.json b/test/cases/measure/data/testdata/duplicated.json new file mode 100644 index 00000000..d67feaaf --- /dev/null +++ b/test/cases/measure/data/testdata/duplicated.json @@ -0,0 +1,182 @@ +[ + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 1 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 2 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 3 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc2" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 5 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc2" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 50 + } + }, + { + "int": { + "value": 4 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc3" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 300 + } + }, + { + "int": { + "value": 6 + } + } + ] + } +] diff --git a/test/cases/measure/data/want/duplicated_part.yaml b/test/cases/measure/data/want/duplicated_part.yaml new file mode 100644 index 00000000..10a57e59 --- /dev/null +++ b/test/cases/measure/data/want/duplicated_part.yaml @@ -0,0 +1,38 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +dataPoints: +- fields: + - name: total + value: + int: + value: "300" + - name: value + value: + int: + value: "6" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: svc3 + - key: entity_id + value: + str: + value: entity_1 diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index 960401ca..06b24498 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -70,4 +70,5 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), )
