This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch stream-load
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 40e9d27678b210db036a315c4e5add3da61d7b69
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Apr 20 14:53:18 2024 +0000

    Fix issues found by tracing test
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/liaison/grpc/stream.go                |   1 +
 banyand/measure/measure.go                    |   4 +-
 banyand/measure/write.go                      |  88 +++++-------
 banyand/stream/index.go                       |   9 +-
 banyand/stream/stream.go                      |   6 +-
 banyand/stream/write.go                       | 103 ++++++--------
 pkg/index/inverted/inverted.go                |  16 +--
 pkg/partition/index.go                        |  37 +++--
 pkg/test/query/metric.go                      | 193 ++++++++++++++++++++++++++
 pkg/test/query/trace.go                       |   4 +-
 test/docker/base-compose.yml                  |  18 +--
 test/stress/trace/Makefile                    |  10 +-
 test/stress/trace/docker-compose-cluster.yaml |   2 +-
 test/stress/trace/docker-compose-single.yaml  |  16 +--
 test/stress/trace/trace_suite_test.go         |  22 ++-
 15 files changed, 352 insertions(+), 177 deletions(-)

diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 37635a4b..5775dcf2 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -131,6 +131,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                if errWritePub != nil {
                        s.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
                        reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, 
s.sampled)
+                       continue
                }
                reply(nil, modelv1.Status_STATUS_SUCCEED, 
writeEntity.GetMessageId(), stream, s.sampled)
        }
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index fb038378..43515964 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -58,7 +58,7 @@ type measure struct {
        name              string
        group             string
        indexRules        []*databasev1.IndexRule
-       indexRuleLocators []*partition.IndexRuleLocator
+       indexRuleLocators partition.IndexRuleLocator
        topNAggregations  []*databasev1.TopNAggregation
        interval          time.Duration
        shardNum          uint32
@@ -103,7 +103,7 @@ func (s *measure) parseSpec() (err error) {
        if s.schema.Interval != "" {
                s.interval, err = timestamp.ParseDuration(s.schema.Interval)
        }
-       s.indexRuleLocators = 
partition.ParseIndexRuleLocators(s.schema.GetTagFamilies(), s.indexRules)
+       s.indexRuleLocators = 
partition.ParseIndexRuleLocators(s.schema.GetEntity(), 
s.schema.GetTagFamilies(), s.indexRules)
 
        return err
 }
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index d6292bb3..4ec45466 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -32,7 +32,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -126,11 +125,11 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        }
        dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
        tagFamilies := make([]nameValues, 0, len(stm.schema.TagFamilies))
-       tagFamiliesForIndexWrite := make([]nameValues, 
len(stm.schema.TagFamilies))
-       entityMap := make(map[string]bool)
-       for _, entity := range stm.GetSchema().GetEntity().GetTagNames() {
-               entityMap[entity] = true
+       if len(stm.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
+               logger.Panicf("metadata crashed, tag family rule length %d, tag 
family length %d",
+                       len(stm.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
        }
+       var fields []index.Field
        for i := range stm.GetSchema().GetTagFamilies() {
                var tagFamily *modelv1.TagFamilyForWrite
                if len(req.DataPoint.TagFamilies) <= i {
@@ -138,6 +137,7 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
                } else {
                        tagFamily = req.DataPoint.TagFamilies[i]
                }
+               tfr := stm.indexRuleLocators.TagFamilyTRule[i]
                tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
                tf := nameValues{
                        name: tagFamilySpec.Name,
@@ -150,15 +150,37 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
                                tagValue = tagFamily.Tags[j]
                        }
 
-                       nameValue := encodeTagValue(
-                               tagFamilySpec.Tags[j].Name,
-                               tagFamilySpec.Tags[j].Type,
+                       t := tagFamilySpec.Tags[j]
+                       encodeTagValue := encodeTagValue(
+                               t.Name,
+                               t.Type,
                                tagValue)
-                       tagFamiliesForIndexWrite[i].values = 
append(tagFamiliesForIndexWrite[i].values, nameValue)
-                       if tagFamilySpec.Tags[j].IndexedOnly || 
entityMap[tagFamilySpec.Tags[j].Name] {
+                       if r, ok := tfr[t.Name]; ok {
+                               if encodeTagValue.value != nil {
+                                       fields = append(fields, index.Field{
+                                               Key: index.FieldKey{
+                                                       IndexRuleID: 
r.GetMetadata().GetId(),
+                                                       Analyzer:    r.Analyzer,
+                                               },
+                                               Term: encodeTagValue.value,
+                                       })
+                               } else {
+                                       for _, val := range 
encodeTagValue.valueArr {
+                                               fields = append(fields, 
index.Field{
+                                                       Key: index.FieldKey{
+                                                               IndexRuleID: 
r.GetMetadata().GetId(),
+                                                               Analyzer:    
r.Analyzer,
+                                                       },
+                                                       Term: val,
+                                               })
+                                       }
+                               }
+                       }
+                       _, isEntity := stm.indexRuleLocators.EntitySet[t.Name]
+                       if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
                                continue
                        }
-                       tf.values = append(tf.values, nameValue)
+                       tf.values = append(tf.values, encodeTagValue)
                }
                if len(tf.values) > 0 {
                        tagFamilies = append(tagFamilies, tf)
@@ -176,33 +198,7 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
                        EntityValues: writeEvent.EntityValues,
                })
        }
-       var fields []index.Field
-       for _, ruleIndex := range stm.indexRuleLocators {
-               nv := getIndexValue(ruleIndex, tagFamiliesForIndexWrite)
-               if nv == nil {
-                       continue
-               }
-               if nv.value != nil {
-                       fields = append(fields, index.Field{
-                               Key: index.FieldKey{
-                                       IndexRuleID: 
ruleIndex.Rule.GetMetadata().GetId(),
-                                       Analyzer:    ruleIndex.Rule.Analyzer,
-                               },
-                               Term: nv.value,
-                       })
-                       continue
-               }
-               for _, val := range nv.valueArr {
-                       rule := ruleIndex.Rule
-                       fields = append(fields, index.Field{
-                               Key: index.FieldKey{
-                                       IndexRuleID: rule.GetMetadata().GetId(),
-                                       Analyzer:    rule.Analyzer,
-                               },
-                               Term: val,
-                       })
-               }
-       }
+
        dpg.docs = append(dpg.docs, index.Document{
                DocID:        uint64(series.ID),
                EntityValues: series.Buffer,
@@ -328,19 +324,3 @@ func encodeTagValue(name string, tagType 
databasev1.TagType, tagValue *modelv1.T
        }
        return nv
 }
-
-func getIndexValue(ruleIndex *partition.IndexRuleLocator, tagFamilies 
[]nameValues) *nameValue {
-       if len(ruleIndex.TagIndices) != 1 {
-               logger.Panicf("the index rule %s(%v) didn't support composited 
tags",
-                       ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags)
-       }
-       tIndex := ruleIndex.TagIndices[0]
-       if tIndex.FamilyOffset >= len(tagFamilies) {
-               return nil
-       }
-       tf := tagFamilies[tIndex.FamilyOffset]
-       if tIndex.TagOffset >= len(tf.values) {
-               return nil
-       }
-       return tf.values[tIndex.TagOffset]
-}
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index 20c02de4..437d3f10 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -61,16 +61,9 @@ func (e *elementIndex) Iterator(fieldKey index.FieldKey, 
termRange index.RangeOp
 }
 
 func (e *elementIndex) Write(docs index.Documents) error {
-       applied := make(chan struct{})
-       err := e.store.Batch(index.Batch{
+       return e.store.Batch(index.Batch{
                Documents: docs,
-               Applied:   applied,
        })
-       if err != nil {
-               return err
-       }
-       <-applied
-       return nil
 }
 
 func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, 
filter index.Filter) ([]elementRef, error) {
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index ba5ce115..0ba53a9d 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -40,7 +40,7 @@ const (
        maxUncompressedBlockSize        = 2 * 1024 * 1024
        maxUncompressedPrimaryBlockSize = 128 * 1024
 
-       defaultFlushTimeout = 5 * time.Second
+       defaultFlushTimeout = time.Second
 )
 
 type option struct {
@@ -74,7 +74,7 @@ type stream struct {
        name              string
        group             string
        indexRules        []*databasev1.IndexRule
-       indexRuleLocators []*partition.IndexRuleLocator
+       indexRuleLocators partition.IndexRuleLocator
        shardNum          uint32
 }
 
@@ -92,7 +92,7 @@ func (s *stream) Close() error {
 
 func (s *stream) parseSpec() {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
-       s.indexRuleLocators = 
partition.ParseIndexRuleLocators(s.schema.GetTagFamilies(), s.indexRules)
+       s.indexRuleLocators = 
partition.ParseIndexRuleLocators(s.schema.GetEntity(), 
s.schema.GetTagFamilies(), s.indexRules)
 }
 
 type streamSpec struct {
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 216a340f..27bc366e 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -31,7 +31,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -112,12 +111,12 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID)
 
        tagFamilies := make([]tagValues, len(stm.schema.TagFamilies))
-       tagFamiliesForIndexWrite := make([]tagValues, 
len(stm.schema.TagFamilies))
-       entityMap := make(map[string]bool)
        et.elements.tagFamilies = append(et.elements.tagFamilies, tagFamilies)
-       for _, entity := range stm.GetSchema().GetEntity().GetTagNames() {
-               entityMap[entity] = true
+       if len(stm.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
+               logger.Panicf("metadata crashed, tag family rule length %d, tag 
family length %d",
+                       len(stm.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
        }
+       var fields []index.Field
        for i := range stm.GetSchema().GetTagFamilies() {
                var tagFamily *modelv1.TagFamilyForWrite
                if len(req.Element.TagFamilies) <= i {
@@ -125,6 +124,7 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                } else {
                        tagFamily = req.Element.TagFamilies[i]
                }
+               tfr := stm.indexRuleLocators.TagFamilyTRule[i]
                tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
                tagFamilies[i].tag = tagFamilySpec.Name
                for j := range tagFamilySpec.Tags {
@@ -135,47 +135,42 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                                tagValue = tagFamily.Tags[j]
                        }
 
+                       t := tagFamilySpec.Tags[j]
                        encodeTagValue := encodeTagValue(
-                               tagFamilySpec.Tags[j].Name,
-                               tagFamilySpec.Tags[j].Type,
+                               t.Name,
+                               t.Type,
                                tagValue)
-                       tagFamiliesForIndexWrite[i].values = 
append(tagFamiliesForIndexWrite[i].values, encodeTagValue)
-                       if tagFamilySpec.Tags[j].IndexedOnly || 
entityMap[tagFamilySpec.Tags[j].Name] {
+                       if r, ok := tfr[t.Name]; ok {
+                               if encodeTagValue.value != nil {
+                                       fields = append(fields, index.Field{
+                                               Key: index.FieldKey{
+                                                       IndexRuleID: 
r.GetMetadata().GetId(),
+                                                       Analyzer:    r.Analyzer,
+                                                       SeriesID:    series.ID,
+                                               },
+                                               Term: encodeTagValue.value,
+                                       })
+                               } else {
+                                       for _, val := range 
encodeTagValue.valueArr {
+                                               fields = append(fields, 
index.Field{
+                                                       Key: index.FieldKey{
+                                                               IndexRuleID: 
r.GetMetadata().GetId(),
+                                                               Analyzer:    
r.Analyzer,
+                                                               SeriesID:    
series.ID,
+                                                       },
+                                                       Term: val,
+                                               })
+                                       }
+                               }
+                       }
+                       _, isEntity := 
stm.indexRuleLocators.EntitySet[tagFamilySpec.Tags[j].Name]
+                       if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
                                continue
                        }
                        tagFamilies[i].values = append(tagFamilies[i].values, 
encodeTagValue)
                }
        }
 
-       var fields []index.Field
-       for _, indexRule := range stm.indexRuleLocators {
-               tv := getIndexValue(indexRule, tagFamiliesForIndexWrite)
-               if tv == nil {
-                       continue
-               }
-               if tv.value != nil {
-                       fields = append(fields, index.Field{
-                               Key: index.FieldKey{
-                                       IndexRuleID: 
indexRule.Rule.GetMetadata().GetId(),
-                                       Analyzer:    indexRule.Rule.Analyzer,
-                                       SeriesID:    series.ID,
-                               },
-                               Term: tv.value,
-                       })
-                       continue
-               }
-               for _, val := range tv.valueArr {
-                       fields = append(fields, index.Field{
-                               Key: index.FieldKey{
-                                       IndexRuleID: 
indexRule.Rule.GetMetadata().GetId(),
-                                       Analyzer:    indexRule.Rule.Analyzer,
-                                       SeriesID:    series.ID,
-                               },
-                               Term: val,
-                       })
-               }
-       }
-
        et.docs = append(et.docs, index.Document{
                DocID:  ts,
                Fields: fields,
@@ -226,14 +221,20 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
                for j := range g.tables {
                        es := g.tables[j]
                        es.tsTable.Table().mustAddElements(&es.elements)
-                       index := es.tsTable.Table().Index()
-                       if err := index.Write(es.docs); err != nil {
-                               w.l.Error().Err(err).Msg("cannot write element 
index")
+                       if len(es.docs) > 0 {
+                               go func() {
+                                       index := es.tsTable.Table().Index()
+                                       if err := index.Write(es.docs); err != 
nil {
+                                               
w.l.Error().Err(err).Msg("cannot write element index")
+                                       }
+                               }()
                        }
                        es.tsTable.DecRef()
                }
-               if err := g.tsdb.IndexDB().Write(g.docs); err != nil {
-                       w.l.Error().Err(err).Msg("cannot write series index")
+               if len(g.docs) > 0 {
+                       if err := g.tsdb.IndexDB().Write(g.docs); err != nil {
+                               w.l.Error().Err(err).Msg("cannot write series 
index")
+                       }
                }
        }
        return
@@ -280,19 +281,3 @@ func encodeTagValue(name string, tagType 
databasev1.TagType, tagVal *modelv1.Tag
        }
        return tv
 }
-
-func getIndexValue(ruleIndex *partition.IndexRuleLocator, tagFamilies 
[]tagValues) *tagValue {
-       if len(ruleIndex.TagIndices) != 1 {
-               logger.Panicf("the index rule %s(%v) didn't support composited 
tags",
-                       ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags)
-       }
-       tIndex := ruleIndex.TagIndices[0]
-       if tIndex.FamilyOffset >= len(tagFamilies) {
-               return nil
-       }
-       tf := tagFamilies[tIndex.FamilyOffset]
-       if tIndex.TagOffset >= len(tf.values) {
-               return nil
-       }
-       return tf.values[tIndex.TagOffset]
-}
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8707befd..176d6b77 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -317,20 +317,17 @@ func (s *store) run() {
                }()
                size := 0
                batch := bluge.NewBatch()
-               flush := func(applied chan struct{}) {
+               flush := func() {
                        if size < 1 {
                                return
                        }
                        if err := s.writer.Batch(batch); err != nil {
                                s.l.Error().Err(err).Msg("write to the inverted 
index")
                        }
-                       if applied != nil {
-                               close(applied)
-                       }
                        batch.Reset()
                        size = 0
                }
-               defer flush(nil)
+               defer flush()
                var docIDBuffer bytes.Buffer
                for {
                        timer := time.NewTimer(s.batchInterval)
@@ -345,7 +342,7 @@ func (s *store) run() {
                                }
                                switch d := event.(type) {
                                case flushEvent:
-                                       flush(nil)
+                                       flush()
                                        close(d.onComplete)
                                case index.Document, index.Batch:
                                        var docs []index.Document
@@ -394,11 +391,14 @@ func (s *store) run() {
                                                batch.Update(doc.ID(), doc)
                                        }
                                        if isBatch || size >= batchSize {
-                                               flush(applied)
+                                               flush()
+                                               if applied != nil {
+                                                       close(applied)
+                                               }
                                        }
                                }
                        case <-timer.C:
-                               flush(nil)
+                               flush()
                        }
                        timer.Stop()
                }
diff --git a/pkg/partition/index.go b/pkg/partition/index.go
index f645178a..1df11225 100644
--- a/pkg/partition/index.go
+++ b/pkg/partition/index.go
@@ -19,27 +19,38 @@ package partition
 
 import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
-// IndexRuleLocator combines several TagLocators that help find the index 
value.
+// IndexRuleLocator is a helper struct to locate the index rule by tag name.
 type IndexRuleLocator struct {
-       Rule       *databasev1.IndexRule
-       TagIndices []TagLocator
+       EntitySet      map[string]struct{}
+       TagFamilyTRule []map[string]*databasev1.IndexRule
 }
 
 // ParseIndexRuleLocators returns a IndexRuleLocator based on the tag family 
spec and index rules.
-func ParseIndexRuleLocators(families []*databasev1.TagFamilySpec, indexRules 
[]*databasev1.IndexRule) (locators []*IndexRuleLocator) {
-       for _, rule := range indexRules {
-               tagIndices := make([]TagLocator, 0, len(rule.GetTags()))
-               for _, tagInIndex := range rule.GetTags() {
-                       fIndex, tIndex, tag := pbv1.FindTagByName(families, 
tagInIndex)
-                       if tag != nil {
-                               tagIndices = append(tagIndices, 
TagLocator{FamilyOffset: fIndex, TagOffset: tIndex})
+func ParseIndexRuleLocators(entity *databasev1.Entity, families 
[]*databasev1.TagFamilySpec, indexRules []*databasev1.IndexRule) (locators 
IndexRuleLocator) {
+       locators.EntitySet = make(map[string]struct{}, len(entity.TagNames))
+       for i := range entity.TagNames {
+               locators.EntitySet[entity.TagNames[i]] = struct{}{}
+       }
+       findIndexRuleByTagName := func(tagName string) *databasev1.IndexRule {
+               for i := range indexRules {
+                       for j := range indexRules[i].Tags {
+                               if indexRules[i].Tags[j] == tagName {
+                                       return indexRules[i]
+                               }
                        }
                }
-               if len(tagIndices) > 0 {
-                       locators = append(locators, &IndexRuleLocator{Rule: 
rule, TagIndices: tagIndices})
+               return nil
+       }
+       for i := range families {
+               ttr := make(map[string]*databasev1.IndexRule)
+               locators.TagFamilyTRule = append(locators.TagFamilyTRule, ttr)
+               for j := range families[i].Tags {
+                       ir := findIndexRuleByTagName(families[i].Tags[j].Name)
+                       if ir != nil {
+                               ttr[families[i].Tags[j].Name] = ir
+                       }
                }
        }
        return locators
diff --git a/pkg/test/query/metric.go b/pkg/test/query/metric.go
new file mode 100644
index 00000000..a890a118
--- /dev/null
+++ b/pkg/test/query/metric.go
@@ -0,0 +1,193 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+       "flag"
+       "fmt"
+       "os"
+       "path"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-cli/pkg/graphql/metrics"
+       "github.com/apache/skywalking-cli/pkg/graphql/utils"
+       "github.com/urfave/cli/v2"
+       api "skywalking.apache.org/repo/goapi/query"
+)
+
+var (
+       isNormal   = true
+       serviceTpl = "g%d::service_0"
+)
+
+// ServiceList verifies the service list.
+func ServiceList(basePath string, timeout time.Duration, groupNum int, fs 
*flag.FlagSet) {
+       basePath = path.Join(basePath, "svc-list")
+       err := os.MkdirAll(basePath, 0o755)
+       if err != nil {
+               panic(err)
+       }
+       stopCh := make(chan struct{})
+       go func() {
+               time.Sleep(timeout)
+               close(stopCh)
+       }()
+       metricNames := []string{"service_sla", "service_cpm", 
"service_resp_time", "service_apdex"}
+       size := len(metricNames) * groupNum
+       header := make([]string, size)
+       for k, mName := range metricNames {
+               for i := 0; i < groupNum; i++ {
+                       offset := k*groupNum + i
+                       header[offset] = fmt.Sprintf("%s-%d", mName, i)
+               }
+       }
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               collect(basePath, func() ([]float64, error) {
+                       data := make([]float64, size)
+                       var subWg sync.WaitGroup
+                       for k, mName := range metricNames {
+                               for i := 0; i < groupNum; i++ {
+                                       offset := k*groupNum + i
+                                       svc := fmt.Sprintf(serviceTpl, i)
+                                       subWg.Add(1)
+                                       go func(mName, svc string) {
+                                               defer subWg.Done()
+                                               d, err := execute(mName, svc, 
"", fs)
+                                               if err != nil {
+                                                       fmt.Printf("query 
metric %s %s error: %v \n", mName, svc, err)
+                                               }
+                                               data[offset] = d.Seconds()
+                                       }(mName, svc)
+                               }
+                       }
+                       subWg.Wait()
+                       return data, nil
+               }, 500*time.Millisecond, stopCh)
+               analyze(header, basePath)
+       }()
+       wg.Wait()
+}
+
+func execute(exp, svc, instance string, fs *flag.FlagSet) (time.Duration, 
error) {
+       ctx := cli.NewContext(cli.NewApp(), fs, nil)
+       entity := &api.Entity{
+               ServiceName:         &svc,
+               Normal:              &isNormal,
+               ServiceInstanceName: &instance,
+       }
+       duration := api.Duration{
+               Start: time.Now().Add(-30 * 
time.Minute).Format(utils.StepFormats[api.StepMinute]),
+               End:   time.Now().Format(utils.StepFormats[api.StepMinute]),
+               Step:  api.StepMinute,
+       }
+       start := time.Now()
+       result, err := metrics.Execute(ctx, exp, entity, duration)
+       elapsed := time.Since(start)
+       if err != nil {
+               return 0, err
+       }
+       if len(result.Results) < 1 {
+               return 0, fmt.Errorf("no result")
+       }
+       return elapsed, nil
+}
+
+// TopN verifies the top N.
+func TopN(basePath string, timeout time.Duration, groupNum int, fs 
*flag.FlagSet) {
+       basePath = path.Join(basePath, "topn")
+       err := os.MkdirAll(basePath, 0o755)
+       if err != nil {
+               panic(err)
+       }
+       stopCh := make(chan struct{})
+       go func() {
+               time.Sleep(timeout)
+               close(stopCh)
+       }()
+       metricNames := []string{"service_instance_resp_time", 
"service_instance_cpm"}
+       size := len(metricNames) * groupNum
+       header := make([]string, size)
+       for k, mName := range metricNames {
+               for i := 0; i < groupNum; i++ {
+                       offset := k*groupNum + i
+                       header[offset] = fmt.Sprintf("%s-%d", mName, i)
+               }
+       }
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               collect(basePath, func() ([]float64, error) {
+                       data := make([]float64, size)
+                       var subWg sync.WaitGroup
+                       for k, mName := range metricNames {
+                               for i := 0; i < groupNum; i++ {
+                                       offset := k*groupNum + i
+                                       svc := fmt.Sprintf(serviceTpl, i)
+                                       subWg.Add(1)
+                                       go func(mName, svc string) {
+                                               defer subWg.Done()
+                                               d, err := sortMetrics(mName, 
svc, 5, api.OrderDes, fs)
+                                               if err != nil {
+                                                       data[offset] = -1
+                                                       fmt.Printf("query 
metric %s %s error: %v \n", mName, svc, err)
+                                               } else {
+                                                       data[offset] = 
d.Seconds()
+                                               }
+                                       }(mName, svc)
+                               }
+                       }
+                       subWg.Wait()
+                       return data, nil
+               }, 500*time.Millisecond, stopCh)
+               analyze(header, basePath)
+       }()
+       wg.Wait()
+}
+
+func sortMetrics(name, svc string, limit int, order api.Order, fs 
*flag.FlagSet) (time.Duration, error) {
+       ctx := cli.NewContext(cli.NewApp(), fs, nil)
+       duration := api.Duration{
+               Start: time.Now().Add(-30 * 
time.Minute).Format(utils.StepFormats[api.StepMinute]),
+               End:   time.Now().Format(utils.StepFormats[api.StepMinute]),
+               Step:  api.StepMinute,
+       }
+       scope := api.ScopeServiceInstance
+       cond := api.TopNCondition{
+               Name:          name,
+               ParentService: &svc,
+               Normal:        &isNormal,
+               Scope:         &scope,
+               TopN:          limit,
+               Order:         order,
+       }
+       start := time.Now()
+       result, err := metrics.SortMetrics(ctx, cond, duration)
+       elapsed := time.Since(start)
+       if err != nil {
+               return 0, err
+       }
+       if len(result) < 1 {
+               return 0, fmt.Errorf("no result")
+       }
+       return elapsed, nil
+}
diff --git a/pkg/test/query/trace.go b/pkg/test/query/trace.go
index 18da1bf8..241acaa6 100644
--- a/pkg/test/query/trace.go
+++ b/pkg/test/query/trace.go
@@ -48,7 +48,7 @@ func TraceListOrderByDuration(basePath string, timeout 
time.Duration, fs *flag.F
                        return nil, err
                }
                return []float64{d.Seconds()}, nil
-       }, 500*time.Millisecond, stopCh)
+       }, time.Second, stopCh)
        analyze([]string{"result"}, basePath)
 }
 
@@ -70,7 +70,7 @@ func TraceListOrderByTime(basePath string, timeout 
time.Duration, fs *flag.FlagS
                        return nil, err
                }
                return []float64{d.Seconds()}, nil
-       }, 500*time.Millisecond, stopCh)
+       }, time.Second, stopCh)
        analyze([]string{"result"}, basePath)
 }
 
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 63e962d5..0a0d345c 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -20,11 +20,11 @@ services:
       - 2121
       - 6060
     command: standalone
-    healthcheck:
-      test: ["CMD", "./bydbctl", "health", "--config=-", 
"--addr=http://banyandb:17913";]
-      interval: 5s
-      timeout: 10s
-      retries: 120
+    # healthcheck:
+    #   test: ["CMD", "./bydbctl", "health", "--config=-", 
"--addr=http://banyandb:17913";]
+    #   interval: 30s
+    #   timeout: 30s
+    #   retries: 120
 
   liaison:
     hostname: liaison
@@ -35,8 +35,8 @@ services:
     command: liaison --etcd-endpoints=http://etcd:2379 
     healthcheck:
       test: ["CMD", "./bydbctl", "health", "--addr=http://liaison:17913";]
-      interval: 5s
-      timeout: 10s
+      interval: 30s
+      timeout: 30s
       retries: 120
   
   data:
@@ -48,8 +48,8 @@ services:
     command: data --etcd-endpoints=http://etcd:2379 
     healthcheck:
       test: ["CMD", "./bydbctl", "health", "--grpc-addr=data:17912"]
-      interval: 5s
-      timeout: 10s
+      interval: 30s
+      timeout: 30s
       retries: 120
   
   agent:
diff --git a/test/stress/trace/Makefile b/test/stress/trace/Makefile
index 7c1d9ef4..102717dc 100644
--- a/test/stress/trace/Makefile
+++ b/test/stress/trace/Makefile
@@ -37,6 +37,10 @@ SIZE ?= 2
 
 QPS ?= 10
 
+GROUP ?= "default"
+
+GINKGO_FLAGS ?=
+
 .PHONY: clean
 clean:
        rm -rf /tmp/banyandb-stress-trace
@@ -49,11 +53,11 @@ down-%:
 
 .PHONY: test_traffic
 test_traffic:
-       curl -XPOST 
'http://localhost:12800/mock-data/segments/tasks?size=$(SIZE)' -H'Content-Type: 
application/json' -d "@segment.tpl.json"
+       curl -XPOST 
'http://localhost:12800/mock-data/segments/tasks?size=$(SIZE)&group=$(GROUP)' 
-H'Content-Type: application/json' -d "@segment.tpl.json"
 
 .PHONY: up_traffic
 up_traffic:
-       curl -XPOST 
'http://localhost:12800/mock-data/segments/tasks?qps=$(QPS)' -H'Content-Type: 
application/json' -d "@segment.tpl.json"
+       curl -XPOST 
'http://localhost:12800/mock-data/segments/tasks?qps=$(QPS)&group=$(GROUP)' 
-H'Content-Type: application/json' -d "@segment.tpl.json"
 
 .PHONY: ls_traffic
 ls_traffic:
@@ -65,5 +69,5 @@ rm_traffic:
 
 .PHONY: test_query
 test_query: $(GINKGO)
-       $(GINKGO) -v -timeout 1h TestQuery ./...
+       $(GINKGO) $(GINKGO_FLAGS) -p -v -timeout 2h TestQuery ./...
        
\ No newline at end of file
diff --git a/test/stress/trace/docker-compose-cluster.yaml 
b/test/stress/trace/docker-compose-cluster.yaml
index 8ca7e293..a478a64e 100644
--- a/test/stress/trace/docker-compose-cluster.yaml
+++ b/test/stress/trace/docker-compose-cluster.yaml
@@ -34,7 +34,7 @@ services:
       resources:
         limits:
           cpus: "4"
-          memory: 4G
+          memory: 8G
     networks:
       - cluster-test
 
diff --git a/test/stress/trace/docker-compose-single.yaml 
b/test/stress/trace/docker-compose-single.yaml
index 05cfff66..dc868d70 100644
--- a/test/stress/trace/docker-compose-single.yaml
+++ b/test/stress/trace/docker-compose-single.yaml
@@ -37,11 +37,11 @@ services:
     - 17913:17913
     - 6060:6060
     - 2121:2121
-    deploy:
-      resources:
-        limits:
-          cpus: "4"
-          memory: 4G
+    # deploy:
+    #   resources:
+    #     limits:
+    #       cpus: "4"
+    #       memory: 8G
     networks:
       - test
       - monitoring
@@ -63,9 +63,9 @@ services:
       - ./log4j2.xml:/skywalking/config/log4j2.xml
     networks:
       - test
-    depends_on:
-      banyandb:
-        condition: service_healthy
+    # depends_on:
+    #   banyandb:
+    #     condition: service_healthy
 
   prometheus:
     image: prom/prometheus:latest
diff --git a/test/stress/trace/trace_suite_test.go 
b/test/stress/trace/trace_suite_test.go
index bdac956e..f54642fb 100644
--- a/test/stress/trace/trace_suite_test.go
+++ b/test/stress/trace/trace_suite_test.go
@@ -37,7 +37,7 @@ func TestIntegrationLoad(t *testing.T) {
 }
 
 var _ = Describe("Query", func() {
-       const timeout = 30 * time.Minute
+       const timeout = time.Hour
        var (
                fs       *flag.FlagSet
                basePath string
@@ -45,14 +45,14 @@ var _ = Describe("Query", func() {
 
        BeforeEach(func() {
                // Check if the URL is reachable
-               resp, err := http.Get("http://localhost:12800/graphql";)
-               if err != nil || resp.StatusCode != 200 {
+               _, err := http.Get("http://localhost:12800/graphql";)
+               if err != nil {
                        // If the request fails or the status code is not 200, 
skip the test
                        Skip("http://localhost:12800/graphql is not reachable")
                }
                fs = flag.NewFlagSet("", flag.PanicOnError)
                fs.String("base-url", "http://localhost:12800/graphql";, "")
-               fs.String("service-id", "c2VydmljZV8x.1", "")
+               fs.String("service-id", "ZzE6OnNlcnZpY2VfMQ==.1", "")
                _, basePath, _, _ = runtime.Caller(0)
                basePath = path.Dir(basePath)
        })
@@ -61,11 +61,19 @@ var _ = Describe("Query", func() {
                query.TraceListOrderByDuration(basePath, timeout, fs)
        })
 
-       It("TraceListOrderByTime", func() {
-               query.TraceListOrderByTime(basePath, timeout, fs)
-       })
+       // It("TraceListOrderByTime", func() {
+       //      query.TraceListOrderByTime(basePath, timeout, fs)
+       // })
 
        It("TraceByID", func() {
                query.TraceByID(basePath, timeout, fs)
        })
+
+       It("Metric", func() {
+               query.ServiceList(basePath, timeout, 6, fs)
+       })
+
+       It("TopN", func() {
+               query.TopN(basePath, timeout, 6, fs)
+       })
 })

Reply via email to