This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch stream-load in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 40e9d27678b210db036a315c4e5add3da61d7b69 Author: Gao Hongtao <[email protected]> AuthorDate: Sat Apr 20 14:53:18 2024 +0000 Fix issues found by tracing test Signed-off-by: Gao Hongtao <[email protected]> --- banyand/liaison/grpc/stream.go | 1 + banyand/measure/measure.go | 4 +- banyand/measure/write.go | 88 +++++------- banyand/stream/index.go | 9 +- banyand/stream/stream.go | 6 +- banyand/stream/write.go | 103 ++++++-------- pkg/index/inverted/inverted.go | 16 +-- pkg/partition/index.go | 37 +++-- pkg/test/query/metric.go | 193 ++++++++++++++++++++++++++ pkg/test/query/trace.go | 4 +- test/docker/base-compose.yml | 18 +-- test/stress/trace/Makefile | 10 +- test/stress/trace/docker-compose-cluster.yaml | 2 +- test/stress/trace/docker-compose-single.yaml | 16 +-- test/stress/trace/trace_suite_test.go | 22 ++- 15 files changed, 352 insertions(+), 177 deletions(-) diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 37635a4b..5775dcf2 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -131,6 +131,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { if errWritePub != nil { s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message") reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled) + continue } reply(nil, modelv1.Status_STATUS_SUCCEED, writeEntity.GetMessageId(), stream, s.sampled) } diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index fb038378..43515964 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -58,7 +58,7 @@ type measure struct { name string group string indexRules []*databasev1.IndexRule - indexRuleLocators []*partition.IndexRuleLocator + indexRuleLocators partition.IndexRuleLocator topNAggregations []*databasev1.TopNAggregation interval time.Duration shardNum uint32 @@ -103,7 +103,7 @@ func (s *measure) parseSpec() (err error) { if s.schema.Interval != "" { s.interval, err = timestamp.ParseDuration(s.schema.Interval) } - s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetTagFamilies(), s.indexRules) + s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) return err } diff --git a/banyand/measure/write.go b/banyand/measure/write.go index d6292bb3..4ec45466 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -32,7 +32,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -126,11 +125,11 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me } dpt.dataPoints.fields = append(dpt.dataPoints.fields, field) tagFamilies := make([]nameValues, 0, len(stm.schema.TagFamilies)) - tagFamiliesForIndexWrite := make([]nameValues, len(stm.schema.TagFamilies)) - entityMap := make(map[string]bool) - for _, entity := range stm.GetSchema().GetEntity().GetTagNames() { - entityMap[entity] = true + if len(stm.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { + logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d", + len(stm.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies())) } + var fields []index.Field for i := range stm.GetSchema().GetTagFamilies() { var tagFamily *modelv1.TagFamilyForWrite if len(req.DataPoint.TagFamilies) <= i { @@ -138,6 +137,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me } else { tagFamily = req.DataPoint.TagFamilies[i] } + tfr := stm.indexRuleLocators.TagFamilyTRule[i] tagFamilySpec := stm.GetSchema().GetTagFamilies()[i] tf := nameValues{ name: tagFamilySpec.Name, @@ -150,15 +150,37 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me tagValue = tagFamily.Tags[j] } - nameValue := encodeTagValue( - tagFamilySpec.Tags[j].Name, - tagFamilySpec.Tags[j].Type, + t := tagFamilySpec.Tags[j] + encodeTagValue := encodeTagValue( + t.Name, + t.Type, tagValue) - tagFamiliesForIndexWrite[i].values = append(tagFamiliesForIndexWrite[i].values, nameValue) - if tagFamilySpec.Tags[j].IndexedOnly || entityMap[tagFamilySpec.Tags[j].Name] { + if r, ok := tfr[t.Name]; ok { + if encodeTagValue.value != nil { + fields = append(fields, index.Field{ + Key: index.FieldKey{ + IndexRuleID: r.GetMetadata().GetId(), + Analyzer: r.Analyzer, + }, + Term: encodeTagValue.value, + }) + } else { + for _, val := range encodeTagValue.valueArr { + fields = append(fields, index.Field{ + Key: index.FieldKey{ + IndexRuleID: r.GetMetadata().GetId(), + Analyzer: r.Analyzer, + }, + Term: val, + }) + } + } + } + _, isEntity := stm.indexRuleLocators.EntitySet[t.Name] + if tagFamilySpec.Tags[j].IndexedOnly || isEntity { continue } - tf.values = append(tf.values, nameValue) + tf.values = append(tf.values, encodeTagValue) } if len(tf.values) > 0 { tagFamilies = append(tagFamilies, tf) @@ -176,33 +198,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me EntityValues: writeEvent.EntityValues, }) } - var fields []index.Field - for _, ruleIndex := range stm.indexRuleLocators { - nv := getIndexValue(ruleIndex, tagFamiliesForIndexWrite) - if nv == nil { - continue - } - if nv.value != nil { - fields = append(fields, index.Field{ - Key: index.FieldKey{ - IndexRuleID: ruleIndex.Rule.GetMetadata().GetId(), - Analyzer: ruleIndex.Rule.Analyzer, - }, - Term: nv.value, - }) - continue - } - for _, val := range nv.valueArr { - rule := ruleIndex.Rule - fields = append(fields, index.Field{ - Key: index.FieldKey{ - IndexRuleID: rule.GetMetadata().GetId(), - Analyzer: rule.Analyzer, - }, - Term: val, - }) - } - } + dpg.docs = append(dpg.docs, index.Document{ DocID: uint64(series.ID), EntityValues: series.Buffer, @@ -328,19 +324,3 @@ func encodeTagValue(name string, tagType databasev1.TagType, tagValue *modelv1.T } return nv } - -func getIndexValue(ruleIndex *partition.IndexRuleLocator, tagFamilies []nameValues) *nameValue { - if len(ruleIndex.TagIndices) != 1 { - logger.Panicf("the index rule %s(%v) didn't support composited tags", - ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags) - } - tIndex := ruleIndex.TagIndices[0] - if tIndex.FamilyOffset >= len(tagFamilies) { - return nil - } - tf := tagFamilies[tIndex.FamilyOffset] - if tIndex.TagOffset >= len(tf.values) { - return nil - } - return tf.values[tIndex.TagOffset] -} diff --git a/banyand/stream/index.go b/banyand/stream/index.go index 20c02de4..437d3f10 100644 --- a/banyand/stream/index.go +++ b/banyand/stream/index.go @@ -61,16 +61,9 @@ func (e *elementIndex) Iterator(fieldKey index.FieldKey, termRange index.RangeOp } func (e *elementIndex) Write(docs index.Documents) error { - applied := make(chan struct{}) - err := e.store.Batch(index.Batch{ + return e.store.Batch(index.Batch{ Documents: docs, - Applied: applied, }) - if err != nil { - return err - } - <-applied - return nil } func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, filter index.Filter) ([]elementRef, error) { diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index ba5ce115..0ba53a9d 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -40,7 +40,7 @@ const ( maxUncompressedBlockSize = 2 * 1024 * 1024 maxUncompressedPrimaryBlockSize = 128 * 1024 - defaultFlushTimeout = 5 * time.Second + defaultFlushTimeout = time.Second ) type option struct { @@ -74,7 +74,7 @@ type stream struct { name string group string indexRules []*databasev1.IndexRule - indexRuleLocators []*partition.IndexRuleLocator + indexRuleLocators partition.IndexRuleLocator shardNum uint32 } @@ -92,7 +92,7 @@ func (s *stream) Close() error { func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() - s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetTagFamilies(), s.indexRules) + s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) } type streamSpec struct { diff --git a/banyand/stream/write.go b/banyand/stream/write.go index 216a340f..27bc366e 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -31,7 +31,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -112,12 +111,12 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID) tagFamilies := make([]tagValues, len(stm.schema.TagFamilies)) - tagFamiliesForIndexWrite := make([]tagValues, len(stm.schema.TagFamilies)) - entityMap := make(map[string]bool) et.elements.tagFamilies = append(et.elements.tagFamilies, tagFamilies) - for _, entity := range stm.GetSchema().GetEntity().GetTagNames() { - entityMap[entity] = true + if len(stm.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { + logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d", + len(stm.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies())) } + var fields []index.Field for i := range stm.GetSchema().GetTagFamilies() { var tagFamily *modelv1.TagFamilyForWrite if len(req.Element.TagFamilies) <= i { @@ -125,6 +124,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre } else { tagFamily = req.Element.TagFamilies[i] } + tfr := stm.indexRuleLocators.TagFamilyTRule[i] tagFamilySpec := stm.GetSchema().GetTagFamilies()[i] tagFamilies[i].tag = tagFamilySpec.Name for j := range tagFamilySpec.Tags { @@ -135,47 +135,42 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre tagValue = tagFamily.Tags[j] } + t := tagFamilySpec.Tags[j] encodeTagValue := encodeTagValue( - tagFamilySpec.Tags[j].Name, - tagFamilySpec.Tags[j].Type, + t.Name, + t.Type, tagValue) - tagFamiliesForIndexWrite[i].values = append(tagFamiliesForIndexWrite[i].values, encodeTagValue) - if tagFamilySpec.Tags[j].IndexedOnly || entityMap[tagFamilySpec.Tags[j].Name] { + if r, ok := tfr[t.Name]; ok { + if encodeTagValue.value != nil { + fields = append(fields, index.Field{ + Key: index.FieldKey{ + IndexRuleID: r.GetMetadata().GetId(), + Analyzer: r.Analyzer, + SeriesID: series.ID, + }, + Term: encodeTagValue.value, + }) + } else { + for _, val := range encodeTagValue.valueArr { + fields = append(fields, index.Field{ + Key: index.FieldKey{ + IndexRuleID: r.GetMetadata().GetId(), + Analyzer: r.Analyzer, + SeriesID: series.ID, + }, + Term: val, + }) + } + } + } + _, isEntity := stm.indexRuleLocators.EntitySet[tagFamilySpec.Tags[j].Name] + if tagFamilySpec.Tags[j].IndexedOnly || isEntity { continue } tagFamilies[i].values = append(tagFamilies[i].values, encodeTagValue) } } - var fields []index.Field - for _, indexRule := range stm.indexRuleLocators { - tv := getIndexValue(indexRule, tagFamiliesForIndexWrite) - if tv == nil { - continue - } - if tv.value != nil { - fields = append(fields, index.Field{ - Key: index.FieldKey{ - IndexRuleID: indexRule.Rule.GetMetadata().GetId(), - Analyzer: indexRule.Rule.Analyzer, - SeriesID: series.ID, - }, - Term: tv.value, - }) - continue - } - for _, val := range tv.valueArr { - fields = append(fields, index.Field{ - Key: index.FieldKey{ - IndexRuleID: indexRule.Rule.GetMetadata().GetId(), - Analyzer: indexRule.Rule.Analyzer, - SeriesID: series.ID, - }, - Term: val, - }) - } - } - et.docs = append(et.docs, index.Document{ DocID: ts, Fields: fields, @@ -226,14 +221,20 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { for j := range g.tables { es := g.tables[j] es.tsTable.Table().mustAddElements(&es.elements) - index := es.tsTable.Table().Index() - if err := index.Write(es.docs); err != nil { - w.l.Error().Err(err).Msg("cannot write element index") + if len(es.docs) > 0 { + go func() { + index := es.tsTable.Table().Index() + if err := index.Write(es.docs); err != nil { + w.l.Error().Err(err).Msg("cannot write element index") + } + }() } es.tsTable.DecRef() } - if err := g.tsdb.IndexDB().Write(g.docs); err != nil { - w.l.Error().Err(err).Msg("cannot write series index") + if len(g.docs) > 0 { + if err := g.tsdb.IndexDB().Write(g.docs); err != nil { + w.l.Error().Err(err).Msg("cannot write series index") + } } } return @@ -280,19 +281,3 @@ func encodeTagValue(name string, tagType databasev1.TagType, tagVal *modelv1.Tag } return tv } - -func getIndexValue(ruleIndex *partition.IndexRuleLocator, tagFamilies []tagValues) *tagValue { - if len(ruleIndex.TagIndices) != 1 { - logger.Panicf("the index rule %s(%v) didn't support composited tags", - ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags) - } - tIndex := ruleIndex.TagIndices[0] - if tIndex.FamilyOffset >= len(tagFamilies) { - return nil - } - tf := tagFamilies[tIndex.FamilyOffset] - if tIndex.TagOffset >= len(tf.values) { - return nil - } - return tf.values[tIndex.TagOffset] -} diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 8707befd..176d6b77 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -317,20 +317,17 @@ func (s *store) run() { }() size := 0 batch := bluge.NewBatch() - flush := func(applied chan struct{}) { + flush := func() { if size < 1 { return } if err := s.writer.Batch(batch); err != nil { s.l.Error().Err(err).Msg("write to the inverted index") } - if applied != nil { - close(applied) - } batch.Reset() size = 0 } - defer flush(nil) + defer flush() var docIDBuffer bytes.Buffer for { timer := time.NewTimer(s.batchInterval) @@ -345,7 +342,7 @@ func (s *store) run() { } switch d := event.(type) { case flushEvent: - flush(nil) + flush() close(d.onComplete) case index.Document, index.Batch: var docs []index.Document @@ -394,11 +391,14 @@ func (s *store) run() { batch.Update(doc.ID(), doc) } if isBatch || size >= batchSize { - flush(applied) + flush() + if applied != nil { + close(applied) + } } } case <-timer.C: - flush(nil) + flush() } timer.Stop() } diff --git a/pkg/partition/index.go b/pkg/partition/index.go index f645178a..1df11225 100644 --- a/pkg/partition/index.go +++ b/pkg/partition/index.go @@ -19,27 +19,38 @@ package partition import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) -// IndexRuleLocator combines several TagLocators that help find the index value. +// IndexRuleLocator is a helper struct to locate the index rule by tag name. type IndexRuleLocator struct { - Rule *databasev1.IndexRule - TagIndices []TagLocator + EntitySet map[string]struct{} + TagFamilyTRule []map[string]*databasev1.IndexRule } // ParseIndexRuleLocators returns a IndexRuleLocator based on the tag family spec and index rules. -func ParseIndexRuleLocators(families []*databasev1.TagFamilySpec, indexRules []*databasev1.IndexRule) (locators []*IndexRuleLocator) { - for _, rule := range indexRules { - tagIndices := make([]TagLocator, 0, len(rule.GetTags())) - for _, tagInIndex := range rule.GetTags() { - fIndex, tIndex, tag := pbv1.FindTagByName(families, tagInIndex) - if tag != nil { - tagIndices = append(tagIndices, TagLocator{FamilyOffset: fIndex, TagOffset: tIndex}) +func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.TagFamilySpec, indexRules []*databasev1.IndexRule) (locators IndexRuleLocator) { + locators.EntitySet = make(map[string]struct{}, len(entity.TagNames)) + for i := range entity.TagNames { + locators.EntitySet[entity.TagNames[i]] = struct{}{} + } + findIndexRuleByTagName := func(tagName string) *databasev1.IndexRule { + for i := range indexRules { + for j := range indexRules[i].Tags { + if indexRules[i].Tags[j] == tagName { + return indexRules[i] + } } } - if len(tagIndices) > 0 { - locators = append(locators, &IndexRuleLocator{Rule: rule, TagIndices: tagIndices}) + return nil + } + for i := range families { + ttr := make(map[string]*databasev1.IndexRule) + locators.TagFamilyTRule = append(locators.TagFamilyTRule, ttr) + for j := range families[i].Tags { + ir := findIndexRuleByTagName(families[i].Tags[j].Name) + if ir != nil { + ttr[families[i].Tags[j].Name] = ir + } } } return locators diff --git a/pkg/test/query/metric.go b/pkg/test/query/metric.go new file mode 100644 index 00000000..a890a118 --- /dev/null +++ b/pkg/test/query/metric.go @@ -0,0 +1,193 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query + +import ( + "flag" + "fmt" + "os" + "path" + "sync" + "time" + + "github.com/apache/skywalking-cli/pkg/graphql/metrics" + "github.com/apache/skywalking-cli/pkg/graphql/utils" + "github.com/urfave/cli/v2" + api "skywalking.apache.org/repo/goapi/query" +) + +var ( + isNormal = true + serviceTpl = "g%d::service_0" +) + +// ServiceList verifies the service list. +func ServiceList(basePath string, timeout time.Duration, groupNum int, fs *flag.FlagSet) { + basePath = path.Join(basePath, "svc-list") + err := os.MkdirAll(basePath, 0o755) + if err != nil { + panic(err) + } + stopCh := make(chan struct{}) + go func() { + time.Sleep(timeout) + close(stopCh) + }() + metricNames := []string{"service_sla", "service_cpm", "service_resp_time", "service_apdex"} + size := len(metricNames) * groupNum + header := make([]string, size) + for k, mName := range metricNames { + for i := 0; i < groupNum; i++ { + offset := k*groupNum + i + header[offset] = fmt.Sprintf("%s-%d", mName, i) + } + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + collect(basePath, func() ([]float64, error) { + data := make([]float64, size) + var subWg sync.WaitGroup + for k, mName := range metricNames { + for i := 0; i < groupNum; i++ { + offset := k*groupNum + i + svc := fmt.Sprintf(serviceTpl, i) + subWg.Add(1) + go func(mName, svc string) { + defer subWg.Done() + d, err := execute(mName, svc, "", fs) + if err != nil { + fmt.Printf("query metric %s %s error: %v \n", mName, svc, err) + } + data[offset] = d.Seconds() + }(mName, svc) + } + } + subWg.Wait() + return data, nil + }, 500*time.Millisecond, stopCh) + analyze(header, basePath) + }() + wg.Wait() +} + +func execute(exp, svc, instance string, fs *flag.FlagSet) (time.Duration, error) { + ctx := cli.NewContext(cli.NewApp(), fs, nil) + entity := &api.Entity{ + ServiceName: &svc, + Normal: &isNormal, + ServiceInstanceName: &instance, + } + duration := api.Duration{ + Start: time.Now().Add(-30 * time.Minute).Format(utils.StepFormats[api.StepMinute]), + End: time.Now().Format(utils.StepFormats[api.StepMinute]), + Step: api.StepMinute, + } + start := time.Now() + result, err := metrics.Execute(ctx, exp, entity, duration) + elapsed := time.Since(start) + if err != nil { + return 0, err + } + if len(result.Results) < 1 { + return 0, fmt.Errorf("no result") + } + return elapsed, nil +} + +// TopN verifies the top N. +func TopN(basePath string, timeout time.Duration, groupNum int, fs *flag.FlagSet) { + basePath = path.Join(basePath, "topn") + err := os.MkdirAll(basePath, 0o755) + if err != nil { + panic(err) + } + stopCh := make(chan struct{}) + go func() { + time.Sleep(timeout) + close(stopCh) + }() + metricNames := []string{"service_instance_resp_time", "service_instance_cpm"} + size := len(metricNames) * groupNum + header := make([]string, size) + for k, mName := range metricNames { + for i := 0; i < groupNum; i++ { + offset := k*groupNum + i + header[offset] = fmt.Sprintf("%s-%d", mName, i) + } + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + collect(basePath, func() ([]float64, error) { + data := make([]float64, size) + var subWg sync.WaitGroup + for k, mName := range metricNames { + for i := 0; i < groupNum; i++ { + offset := k*groupNum + i + svc := fmt.Sprintf(serviceTpl, i) + subWg.Add(1) + go func(mName, svc string) { + defer subWg.Done() + d, err := sortMetrics(mName, svc, 5, api.OrderDes, fs) + if err != nil { + data[offset] = -1 + fmt.Printf("query metric %s %s error: %v \n", mName, svc, err) + } else { + data[offset] = d.Seconds() + } + }(mName, svc) + } + } + subWg.Wait() + return data, nil + }, 500*time.Millisecond, stopCh) + analyze(header, basePath) + }() + wg.Wait() +} + +func sortMetrics(name, svc string, limit int, order api.Order, fs *flag.FlagSet) (time.Duration, error) { + ctx := cli.NewContext(cli.NewApp(), fs, nil) + duration := api.Duration{ + Start: time.Now().Add(-30 * time.Minute).Format(utils.StepFormats[api.StepMinute]), + End: time.Now().Format(utils.StepFormats[api.StepMinute]), + Step: api.StepMinute, + } + scope := api.ScopeServiceInstance + cond := api.TopNCondition{ + Name: name, + ParentService: &svc, + Normal: &isNormal, + Scope: &scope, + TopN: limit, + Order: order, + } + start := time.Now() + result, err := metrics.SortMetrics(ctx, cond, duration) + elapsed := time.Since(start) + if err != nil { + return 0, err + } + if len(result) < 1 { + return 0, fmt.Errorf("no result") + } + return elapsed, nil +} diff --git a/pkg/test/query/trace.go b/pkg/test/query/trace.go index 18da1bf8..241acaa6 100644 --- a/pkg/test/query/trace.go +++ b/pkg/test/query/trace.go @@ -48,7 +48,7 @@ func TraceListOrderByDuration(basePath string, timeout time.Duration, fs *flag.F return nil, err } return []float64{d.Seconds()}, nil - }, 500*time.Millisecond, stopCh) + }, time.Second, stopCh) analyze([]string{"result"}, basePath) } @@ -70,7 +70,7 @@ func TraceListOrderByTime(basePath string, timeout time.Duration, fs *flag.FlagS return nil, err } return []float64{d.Seconds()}, nil - }, 500*time.Millisecond, stopCh) + }, time.Second, stopCh) analyze([]string{"result"}, basePath) } diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml index 63e962d5..0a0d345c 100644 --- a/test/docker/base-compose.yml +++ b/test/docker/base-compose.yml @@ -20,11 +20,11 @@ services: - 2121 - 6060 command: standalone - healthcheck: - test: ["CMD", "./bydbctl", "health", "--config=-", "--addr=http://banyandb:17913"] - interval: 5s - timeout: 10s - retries: 120 + # healthcheck: + # test: ["CMD", "./bydbctl", "health", "--config=-", "--addr=http://banyandb:17913"] + # interval: 30s + # timeout: 30s + # retries: 120 liaison: hostname: liaison @@ -35,8 +35,8 @@ services: command: liaison --etcd-endpoints=http://etcd:2379 healthcheck: test: ["CMD", "./bydbctl", "health", "--addr=http://liaison:17913"] - interval: 5s - timeout: 10s + interval: 30s + timeout: 30s retries: 120 data: @@ -48,8 +48,8 @@ services: command: data --etcd-endpoints=http://etcd:2379 healthcheck: test: ["CMD", "./bydbctl", "health", "--grpc-addr=data:17912"] - interval: 5s - timeout: 10s + interval: 30s + timeout: 30s retries: 120 agent: diff --git a/test/stress/trace/Makefile b/test/stress/trace/Makefile index 7c1d9ef4..102717dc 100644 --- a/test/stress/trace/Makefile +++ b/test/stress/trace/Makefile @@ -37,6 +37,10 @@ SIZE ?= 2 QPS ?= 10 +GROUP ?= "default" + +GINKGO_FLAGS ?= + .PHONY: clean clean: rm -rf /tmp/banyandb-stress-trace @@ -49,11 +53,11 @@ down-%: .PHONY: test_traffic test_traffic: - curl -XPOST 'http://localhost:12800/mock-data/segments/tasks?size=$(SIZE)' -H'Content-Type: application/json' -d "@segment.tpl.json" + curl -XPOST 'http://localhost:12800/mock-data/segments/tasks?size=$(SIZE)&group=$(GROUP)' -H'Content-Type: application/json' -d "@segment.tpl.json" .PHONY: up_traffic up_traffic: - curl -XPOST 'http://localhost:12800/mock-data/segments/tasks?qps=$(QPS)' -H'Content-Type: application/json' -d "@segment.tpl.json" + curl -XPOST 'http://localhost:12800/mock-data/segments/tasks?qps=$(QPS)&group=$(GROUP)' -H'Content-Type: application/json' -d "@segment.tpl.json" .PHONY: ls_traffic ls_traffic: @@ -65,5 +69,5 @@ rm_traffic: .PHONY: test_query test_query: $(GINKGO) - $(GINKGO) -v -timeout 1h TestQuery ./... + $(GINKGO) $(GINKGO_FLAGS) -p -v -timeout 2h TestQuery ./... \ No newline at end of file diff --git a/test/stress/trace/docker-compose-cluster.yaml b/test/stress/trace/docker-compose-cluster.yaml index 8ca7e293..a478a64e 100644 --- a/test/stress/trace/docker-compose-cluster.yaml +++ b/test/stress/trace/docker-compose-cluster.yaml @@ -34,7 +34,7 @@ services: resources: limits: cpus: "4" - memory: 4G + memory: 8G networks: - cluster-test diff --git a/test/stress/trace/docker-compose-single.yaml b/test/stress/trace/docker-compose-single.yaml index 05cfff66..dc868d70 100644 --- a/test/stress/trace/docker-compose-single.yaml +++ b/test/stress/trace/docker-compose-single.yaml @@ -37,11 +37,11 @@ services: - 17913:17913 - 6060:6060 - 2121:2121 - deploy: - resources: - limits: - cpus: "4" - memory: 4G + # deploy: + # resources: + # limits: + # cpus: "4" + # memory: 8G networks: - test - monitoring @@ -63,9 +63,9 @@ services: - ./log4j2.xml:/skywalking/config/log4j2.xml networks: - test - depends_on: - banyandb: - condition: service_healthy + # depends_on: + # banyandb: + # condition: service_healthy prometheus: image: prom/prometheus:latest diff --git a/test/stress/trace/trace_suite_test.go b/test/stress/trace/trace_suite_test.go index bdac956e..f54642fb 100644 --- a/test/stress/trace/trace_suite_test.go +++ b/test/stress/trace/trace_suite_test.go @@ -37,7 +37,7 @@ func TestIntegrationLoad(t *testing.T) { } var _ = Describe("Query", func() { - const timeout = 30 * time.Minute + const timeout = time.Hour var ( fs *flag.FlagSet basePath string @@ -45,14 +45,14 @@ var _ = Describe("Query", func() { BeforeEach(func() { // Check if the URL is reachable - resp, err := http.Get("http://localhost:12800/graphql") - if err != nil || resp.StatusCode != 200 { + _, err := http.Get("http://localhost:12800/graphql") + if err != nil { // If the request fails or the status code is not 200, skip the test Skip("http://localhost:12800/graphql is not reachable") } fs = flag.NewFlagSet("", flag.PanicOnError) fs.String("base-url", "http://localhost:12800/graphql", "") - fs.String("service-id", "c2VydmljZV8x.1", "") + fs.String("service-id", "ZzE6OnNlcnZpY2VfMQ==.1", "") _, basePath, _, _ = runtime.Caller(0) basePath = path.Dir(basePath) }) @@ -61,11 +61,19 @@ var _ = Describe("Query", func() { query.TraceListOrderByDuration(basePath, timeout, fs) }) - It("TraceListOrderByTime", func() { - query.TraceListOrderByTime(basePath, timeout, fs) - }) + // It("TraceListOrderByTime", func() { + // query.TraceListOrderByTime(basePath, timeout, fs) + // }) It("TraceByID", func() { query.TraceByID(basePath, timeout, fs) }) + + It("Metric", func() { + query.ServiceList(basePath, timeout, 6, fs) + }) + + It("TopN", func() { + query.TopN(basePath, timeout, 6, fs) + }) })
