This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test/multi-segments in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a2a8bde5ca4ea9e28f002c5b9de2e75469ea6b1d Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue Sep 23 08:40:06 2025 +0800 Fix sorting issue of timestamps in measure model segments and enhance memory management in merging process. Update related tests for improved coverage and accuracy. --- CHANGES.md | 1 + banyand/measure/flusher.go | 85 ++++++++++--- banyand/measure/part.go | 1 + banyand/measure/query_test.go | 4 +- banyand/measure/syncer.go | 23 ---- banyand/measure/tstable.go | 8 ++ banyand/measure/write_liaison.go | 5 +- banyand/measure/write_standalone.go | 3 - banyand/stream/flusher.go | 85 ++++++++++--- banyand/stream/part.go | 2 + banyand/stream/tstable.go | 8 ++ banyand/stream/write_liaison.go | 2 +- banyand/trace/flusher.go | 138 +++++++++++++++------ banyand/trace/part.go | 2 + banyand/trace/tstable.go | 8 ++ banyand/trace/write_liaison.go | 2 +- test/cases/measure/data/input/linked_or.yaml | 1 - test/cases/measure/measure.go | 5 +- .../multi_segments/multi_segments_suite_test.go | 9 +- .../multi_segments/multi_segments_suite_test.go | 2 +- 20 files changed, 279 insertions(+), 115 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ab84e0d0..79e28c3a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,7 @@ Release Notes. - Fix returning empty result when using IN operatior on the array type tags. - Fix memory leaks and OOM issues in streaming processing by implementing deduplication logic in priority queues and improving sliding window memory management. - Fix etcd prefix matching any key that starts with this prefix. +- Fix the sorting timestamps issue of the measure model when there are more than one segment. ### Document diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go index 7c2b0bba..c0390f50 100644 --- a/banyand/measure/flusher.go +++ b/banyand/measure/flusher.go @@ -99,33 +99,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch uint64, flushWatcher watc } func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan *mergerIntroduction) (bool, error) { + var merged bool + var currentSegmentID int64 var memParts []*partWrapper mergedIDs := make(map[uint64]struct{}) + + // Helper function to merge current segment's parts + mergeCurrentSegment := func() (bool, error) { + if len(memParts) < 2 { + return false, nil + } + + // Create a copy of mergedIDs for this merge operation + currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs)) + for id := range mergedIDs { + currentMergedIDs[id] = struct{}{} + } + + // merge memory must not be closed by the tsTable.close + closeCh := make(chan struct{}) + newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, + currentMergedIDs, mergeCh, closeCh, "mem") + close(closeCh) + if err != nil { + if errors.Is(err, errClosed) { + return true, nil + } + return false, err + } + return newPart != nil, nil + } + + // Process parts grouped by segmentID for i := range snp.parts { - if snp.parts[i].mp != nil { - memParts = append(memParts, snp.parts[i]) - mergedIDs[snp.parts[i].ID()] = struct{}{} + if snp.parts[i].mp == nil { continue } - } - if len(memParts) < 2 { - return false, nil - } - // merge memory must not be closed by the tsTable.close - closeCh := make(chan struct{}) - newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, - mergedIDs, mergeCh, closeCh, "mem") - close(closeCh) - if err != nil { - if errors.Is(err, errClosed) { - return true, nil + + segID := snp.parts[i].mp.segmentID + + // If this is a new segment, merge the previous segment first + if currentSegmentID != 0 && currentSegmentID != segID { + m, err := mergeCurrentSegment() + if err != nil { + return false, err + } + if m { + merged = true + } + + // Reset for next segment + memParts = memParts[:0] + for id := range mergedIDs { + delete(mergedIDs, id) + } } - return false, err + + // Add part to current segment + currentSegmentID = segID + memParts = append(memParts, snp.parts[i]) + mergedIDs[snp.parts[i].ID()] = struct{}{} } - if newPart == nil { - return false, nil + + // Merge the last segment if it has parts + if len(memParts) >= 2 { + m, err := mergeCurrentSegment() + if err != nil { + return false, err + } + if m { + merged = true + } } - return true, nil + + return merged, nil } func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) { diff --git a/banyand/measure/part.go b/banyand/measure/part.go index d7d639ea..e47bfbc6 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -109,6 +109,7 @@ type memPart struct { timestamps bytes.Buffer fieldValues bytes.Buffer partMetadata partMetadata + segmentID int64 } func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, fs.Writer) { diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 85d1af5f..17ca9c37 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -1699,9 +1699,7 @@ func TestSegResultHeap_Sorting(t *testing.T) { } // Add all results to heap - for _, sr := range tt.segResults { - heap.results = append(heap.results, sr) - } + heap.results = append(heap.results, tt.segResults...) // Initialize heap require.Equal(t, len(tt.segResults), heap.Len()) diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go index 791a97e0..b3b7f3a4 100644 --- a/banyand/measure/syncer.go +++ b/banyand/measure/syncer.go @@ -20,7 +20,6 @@ package measure import ( "context" "fmt" - "strings" "time" "github.com/apache/skywalking-banyandb/api/data" @@ -231,28 +230,6 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan *syncIntrodu // Create streaming reader for the part. files, release := createPartFileReaders(part) releaseFuncs = append(releaseFuncs, release) - builder := strings.Builder{} - for i := range part.primaryBlockMetadata { - offset := part.primaryBlockMetadata[i].offset - size := part.primaryBlockMetadata[i].size - buf := make([]byte, size) - part.primary.Read(int64(offset), buf) - uncompressedBuf, err := zstd.Decompress(nil, buf) - if err != nil { - return fmt.Errorf("cannot decompress block metadata: %w", err) - } - blockMetadata, err := unmarshalBlockMetadata(nil, uncompressedBuf) - if err != nil { - return fmt.Errorf("cannot unmarshal block metadata: %w", err) - } - for _, block := range blockMetadata { - builder.WriteString(fmt.Sprintf("%v", block.seriesID)) - builder.WriteString(",") - } - } - timeStart := time.Unix(0, part.partMetadata.MinTimestamp) - timeEnd := time.Unix(0, part.partMetadata.MaxTimestamp) - fmt.Printf("snp %v primary block metadata: %v total count: %v time range: %v-%v group: %v shard: %v - %v\n", curSnapshot.epoch, builder.String(), part.partMetadata.TotalCount, timeStart, timeEnd, tst.group, tst.shardID, time.Now().Format(time.StampNano)) // Create streaming part sync data. streamingParts = append(streamingParts, queue.StreamingPartData{ ID: part.partMetadata.ID, diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 2f71eddd..5ddab3ea 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -106,6 +106,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i])) } if len(loadedParts) == 0 || len(loadedSnapshots) == 0 { + for _, id := range loadedSnapshots { + fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) + } return &tst, uint64(time.Now().UnixNano()) } sort.Slice(loadedSnapshots, func(i, j int) bool { @@ -279,12 +282,17 @@ func (tst *tsTable) Close() error { } func (tst *tsTable) mustAddDataPoints(dps *dataPoints) { + tst.mustAddDataPointsWithSegmentID(dps, 0) +} + +func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID int64) { if len(dps.seriesIDs) == 0 { return } mp := generateMemPart() mp.mustInitFromDataPoints(dps) + mp.segmentID = segmentID tst.mustAddMemPart(mp) } diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go index 6bae9dc4..e64cad88 100644 --- a/banyand/measure/write_liaison.go +++ b/banyand/measure/write_liaison.go @@ -108,10 +108,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp for j := range g.tables { es := g.tables[j] if es.tsTable != nil && es.dataPoints != nil { - for i := range es.dataPoints.timestamps { - fmt.Printf("series id: %v timestamp: %v time range: %v\n", es.dataPoints.seriesIDs[i], es.dataPoints.timestamps[i], es.timeRange) - } - es.tsTable.mustAddDataPoints(es.dataPoints) + es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints, es.timeRange.Start.UnixNano()) releaseDataPoints(es.dataPoints) } nodes := g.queue.GetNodes(es.shardID) diff --git a/banyand/measure/write_standalone.go b/banyand/measure/write_standalone.go index 759af0e2..dc7fd8b1 100644 --- a/banyand/measure/write_standalone.go +++ b/banyand/measure/write_standalone.go @@ -80,9 +80,6 @@ func processDataPoint(dpt *dataPointsInTable, req *measurev1.WriteRequest, write if err := series.Marshal(); err != nil { return 0, fmt.Errorf("cannot marshal series: %w", err) } - if req.Metadata.Name == "service_cpm_minute" { - fmt.Printf("entity values: %v time range: %v series id: %v\n", writeEvent.EntityValues, dpt.timeRange, series.ID) - } if stm.schema.IndexMode { fields := handleIndexMode(stm.schema, req, is.indexRuleLocators) diff --git a/banyand/stream/flusher.go b/banyand/stream/flusher.go index d8d4cc2f..04788af0 100644 --- a/banyand/stream/flusher.go +++ b/banyand/stream/flusher.go @@ -129,33 +129,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch uint64, flushWatcher watc } func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan *mergerIntroduction) (bool, error) { + var merged bool + var currentSegmentID int64 var memParts []*partWrapper mergedIDs := make(map[uint64]struct{}) + + // Helper function to merge current segment's parts + mergeCurrentSegment := func() (bool, error) { + if len(memParts) < 2 { + return false, nil + } + + // Create a copy of mergedIDs for this merge operation + currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs)) + for id := range mergedIDs { + currentMergedIDs[id] = struct{}{} + } + + // merge memory must not be closed by the tsTable.close + closeCh := make(chan struct{}) + newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, + currentMergedIDs, mergeCh, closeCh, "mem") + close(closeCh) + if err != nil { + if errors.Is(err, errClosed) { + return true, nil + } + return false, err + } + return newPart != nil, nil + } + + // Process parts grouped by segmentID for i := range snp.parts { - if snp.parts[i].mp != nil { - memParts = append(memParts, snp.parts[i]) - mergedIDs[snp.parts[i].ID()] = struct{}{} + if snp.parts[i].mp == nil { continue } - } - if len(memParts) < 2 { - return false, nil - } - // merge memory must not be closed by the tsTable.close - closeCh := make(chan struct{}) - newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, - mergedIDs, mergeCh, closeCh, "mem") - close(closeCh) - if err != nil { - if errors.Is(err, errClosed) { - return true, nil + + segID := snp.parts[i].mp.segmentID + + // If this is a new segment, merge the previous segment first + if currentSegmentID != 0 && currentSegmentID != segID { + m, err := mergeCurrentSegment() + if err != nil { + return false, err + } + if m { + merged = true + } + + // Reset for next segment + memParts = memParts[:0] + for id := range mergedIDs { + delete(mergedIDs, id) + } } - return false, err + + // Add part to current segment + currentSegmentID = segID + memParts = append(memParts, snp.parts[i]) + mergedIDs[snp.parts[i].ID()] = struct{}{} } - if newPart == nil { - return false, nil + + // Merge the last segment if it has parts + if len(memParts) >= 2 { + m, err := mergeCurrentSegment() + if err != nil { + return false, err + } + if m { + merged = true + } } - return true, nil + + return merged, nil } func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) { diff --git a/banyand/stream/part.go b/banyand/stream/part.go index 07e5a22f..39cb2145 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -112,6 +112,7 @@ type memPart struct { primary bytes.Buffer timestamps bytes.Buffer partMetadata partMetadata + segmentID int64 } func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, fs.Writer, fs.Writer) { @@ -140,6 +141,7 @@ func (mp *memPart) reset() { mp.meta.Reset() mp.primary.Reset() mp.timestamps.Reset() + mp.segmentID = 0 if mp.tagFamilies != nil { for k, tf := range mp.tagFamilies { tf.Reset() diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index bd521ce4..9384c4b4 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -261,6 +261,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i])) } if len(loadedParts) == 0 || len(loadedSnapshots) == 0 { + for _, id := range loadedSnapshots { + fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) + } return &tst, uint64(time.Now().UnixNano()), nil } sort.Slice(loadedSnapshots, func(i, j int) bool { @@ -336,12 +339,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) { } func (tst *tsTable) mustAddElements(es *elements) { + tst.mustAddElementsWithSegmentID(es, 0) +} + +func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID int64) { if len(es.seriesIDs) == 0 { return } mp := generateMemPart() mp.mustInitFromElements(es) + mp.segmentID = segmentID tst.mustAddMemPart(mp) } diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go index c78d1a37..be566f09 100644 --- a/banyand/stream/write_liaison.go +++ b/banyand/stream/write_liaison.go @@ -181,7 +181,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp g := groups[i] for j := range g.tables { es := g.tables[j] - es.tsTable.mustAddElements(es.elements) + es.tsTable.mustAddElementsWithSegmentID(es.elements, es.timeRange.Start.UnixNano()) releaseElements(es.elements) // Get nodes for this shard nodes := g.queue.GetNodes(es.shardID) diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go index 56f81c32..727cad6c 100644 --- a/banyand/trace/flusher.go +++ b/banyand/trace/flusher.go @@ -48,33 +48,20 @@ func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, mergeCh chan tst.incTotalFlushLatency(time.Since(start).Seconds()) }() curSnapshot := tst.currentSnapshot() - if curSnapshot != nil { - flusherWatchers = tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers) - curSnapshot.decRef() - curSnapshot = nil - } - tst.RLock() - if tst.snapshot != nil && tst.snapshot.epoch > epoch { - curSnapshot = tst.snapshot - curSnapshot.incRef() - } - tst.RUnlock() + var ok bool + var merged bool + curSnapshot, flusherWatchers, ok, merged = tst.pauseFlusherToPileupMemPartsWithMerge(curSnapshot, flusherWatcher, flusherWatchers, epoch, mergeCh) if curSnapshot != nil { defer curSnapshot.decRef() - merged, err := tst.mergeMemParts(curSnapshot, mergeCh) - if err != nil { - tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", curSnapshot.epoch) - tst.incTotalFlushLoopErr(1) + if !ok { return false } if !merged { tst.flush(curSnapshot, flushCh) } - epoch = curSnapshot.epoch - // Notify merger to start a new round of merge. - // This round might have be triggered in pauseFlusherToPileupMemParts. flusherWatchers.Notify(math.MaxUint64) flusherWatchers = nil + epoch = curSnapshot.epoch if tst.currentEpoch() != epoch { tst.incTotalFlushLoopProgress(1) return false @@ -132,6 +119,36 @@ func (tst *tsTable) flusherLoopNoMerger(flushCh chan *flusherIntroduction, intro } } +func (tst *tsTable) pauseFlusherToPileupMemPartsWithMerge( + curSnapshot *snapshot, flusherWatcher watcher.Channel, flusherWatchers watcher.Epochs, + epoch uint64, mergeCh chan *mergerIntroduction, +) (*snapshot, watcher.Epochs, bool, bool) { + if tst.option.flushTimeout < 1 { + return curSnapshot, flusherWatchers, true, false + } + if curSnapshot != nil { + flusherWatchers = tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers) + curSnapshot.decRef() + curSnapshot = nil + } + tst.RLock() + if tst.snapshot != nil && tst.snapshot.epoch > epoch { + curSnapshot = tst.snapshot + curSnapshot.incRef() + } + tst.RUnlock() + if curSnapshot == nil { + return nil, flusherWatchers, false, false + } + merged, err := tst.mergeMemParts(curSnapshot, mergeCh) + if err != nil { + tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", curSnapshot.epoch) + tst.incTotalFlushLoopErr(1) + return curSnapshot, flusherWatchers, false, false + } + return curSnapshot, flusherWatchers, true, merged +} + // pauseFlusherToPileupMemParts takes a pause to wait for in-memory parts to pile up. // If there is no in-memory part, we can skip the pause. // When a merging is finished, we can skip the pause. @@ -155,33 +172,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch uint64, flushWatcher watc } func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan *mergerIntroduction) (bool, error) { + var merged bool + var currentSegmentID int64 var memParts []*partWrapper mergedIDs := make(map[partHandle]struct{}) + + // Helper function to merge current segment's parts + mergeCurrentSegment := func() (bool, error) { + if len(memParts) < 2 { + return false, nil + } + + // Create a copy of mergedIDs for this merge operation + currentMergedIDs := make(map[partHandle]struct{}, len(mergedIDs)) + for id := range mergedIDs { + currentMergedIDs[id] = struct{}{} + } + + // merge memory must not be closed by the tsTable.close + closeCh := make(chan struct{}) + newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, + currentMergedIDs, mergeCh, closeCh, "mem") + close(closeCh) + if err != nil { + if errors.Is(err, errClosed) { + return true, nil + } + return false, err + } + return newPart != nil, nil + } + + // Process parts grouped by segmentID for i := range snp.parts { - if snp.parts[i].mp != nil { - memParts = append(memParts, snp.parts[i]) - mergedIDs[partHandle{partID: snp.parts[i].ID(), partType: PartTypeCore}] = struct{}{} + if snp.parts[i].mp == nil { continue } - } - if len(memParts) < 2 { - return false, nil - } - // merge memory must not be closed by the tsTable.close - closeCh := make(chan struct{}) - newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, - mergedIDs, mergeCh, closeCh, "mem") - close(closeCh) - if err != nil { - if errors.Is(err, errClosed) { - return true, nil + + segID := snp.parts[i].mp.segmentID + + // If this is a new segment, merge the previous segment first + if currentSegmentID != 0 && currentSegmentID != segID { + m, err := mergeCurrentSegment() + if err != nil { + return false, err + } + if m { + merged = true + } + + // Reset for next segment + memParts = memParts[:0] + for id := range mergedIDs { + delete(mergedIDs, id) + } } - return false, err + + // Add part to current segment + currentSegmentID = segID + memParts = append(memParts, snp.parts[i]) + mergedIDs[partHandle{partID: snp.parts[i].ID(), partType: PartTypeCore}] = struct{}{} } - if newPart == nil { - return false, nil + + // Merge the last segment if it has parts + if len(memParts) >= 2 { + m, err := mergeCurrentSegment() + if err != nil { + return false, err + } + if m { + merged = true + } } - return true, nil + + return merged, nil } func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) { diff --git a/banyand/trace/part.go b/banyand/trace/part.go index d8ba721d..b76caab9 100644 --- a/banyand/trace/part.go +++ b/banyand/trace/part.go @@ -113,6 +113,7 @@ type memPart struct { meta bytes.Buffer primary bytes.Buffer partMetadata partMetadata + segmentID int64 } func (mp *memPart) mustCreateMemTagWriters(name string) (fs.Writer, fs.Writer) { @@ -143,6 +144,7 @@ func (mp *memPart) reset() { mp.meta.Reset() mp.primary.Reset() mp.spans.Reset() + mp.segmentID = 0 if mp.tags != nil { for k, t := range mp.tags { t.Reset() diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go index 8e3be10f..645dde18 100644 --- a/banyand/trace/tstable.go +++ b/banyand/trace/tstable.go @@ -248,6 +248,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i])) } if len(loadedParts) == 0 || len(loadedSnapshots) == 0 { + for _, id := range loadedSnapshots { + fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) + } return &tst, uint64(time.Now().UnixNano()) } sort.Slice(loadedSnapshots, func(i, j int) bool { @@ -412,12 +415,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) { } func (tst *tsTable) mustAddTraces(ts *traces) { + tst.mustAddTracesWithSegmentID(ts, 0) +} + +func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64) { if len(ts.traceIDs) == 0 { return } mp := generateMemPart() mp.mustInitFromTraces(ts) + mp.segmentID = segmentID tst.mustAddMemPart(mp) } diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go index 58a37f60..094f1fb7 100644 --- a/banyand/trace/write_liaison.go +++ b/banyand/trace/write_liaison.go @@ -191,7 +191,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp g := groups[i] for j := range g.tables { es := g.tables[j] - es.tsTable.mustAddTraces(es.traces) + es.tsTable.mustAddTracesWithSegmentID(es.traces, es.timeRange.Start.UnixNano()) releaseTraces(es.traces) for sidxName, sidxReqs := range es.sidxReqsMap { diff --git a/test/cases/measure/data/input/linked_or.yaml b/test/cases/measure/data/input/linked_or.yaml index 898a02a2..d2662a84 100644 --- a/test/cases/measure/data/input/linked_or.yaml +++ b/test/cases/measure/data/input/linked_or.yaml @@ -16,7 +16,6 @@ # under the License. name: "service_cpm_minute" -trace: true groups: ["sw_metric"] tagProjection: tagFamilies: diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index 12a693ca..274dd3cb 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -67,7 +67,7 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("filter by entity id and service id", helpers.Args{Input: "entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}), - g.FEntry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("In and not In expressions", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), @@ -77,7 +77,8 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("all of index mode in a larger time range", helpers.Args{Input: "index_mode_all", Want: "index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour, DisOrder: true}), - g.Entry("all in all segments of index mode", helpers.Args{Input: "index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour, DisOrder: true}), + g.Entry("all in all segments of index mode", + helpers.Args{Input: "index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour, DisOrder: true}), g.Entry("order by desc of index mode", helpers.Args{Input: "index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}), diff --git a/test/integration/distributed/multi_segments/multi_segments_suite_test.go b/test/integration/distributed/multi_segments/multi_segments_suite_test.go index bb1af63d..5a8fa9fe 100644 --- a/test/integration/distributed/multi_segments/multi_segments_suite_test.go +++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go @@ -42,11 +42,13 @@ import ( test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" "github.com/apache/skywalking-banyandb/pkg/test/setup" test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" + test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace" "github.com/apache/skywalking-banyandb/pkg/timestamp" test_cases "github.com/apache/skywalking-banyandb/test/cases" casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" ) func TestDistributedMultiSegments(t *testing.T) { @@ -95,6 +97,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { ctx := context.Background() test_stream.PreloadSchema(ctx, schemaRegistry) test_measure.PreloadSchema(ctx, schemaRegistry) + test_trace.PreloadSchema(ctx, schemaRegistry) By("Starting data node 0") closeDataNode0 := setup.DataNode(ep) @@ -106,7 +109,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { By("Initializing test cases") ns := timestamp.NowMilli().UnixNano() now = time.Unix(0, ns-ns%int64(time.Minute)) - baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, now.Location()) + baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 0, now.Location()) test_cases.Initialize(liaisonAddr, baseTime) deferFunc = func() { @@ -134,6 +137,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { Connection: connection, BaseTime: baseTime, } + casestrace.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/integration/standalone/multi_segments/multi_segments_suite_test.go b/test/integration/standalone/multi_segments/multi_segments_suite_test.go index b3ea9011..5c8e1c61 100644 --- a/test/integration/standalone/multi_segments/multi_segments_suite_test.go +++ b/test/integration/standalone/multi_segments/multi_segments_suite_test.go @@ -66,7 +66,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { addr, _, deferFunc = setup.Standalone() ns := timestamp.NowMilli().UnixNano() now = time.Unix(0, ns-ns%int64(time.Minute)) - baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, now.Location()) + baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 0, now.Location()) test_cases.Initialize(addr, baseTime) return []byte(addr) }, func(address []byte) {