This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new c43573c5 Fix the OOM when Merging Trace (#787) c43573c5 is described below commit c43573c59155bf1d2d4982135526d36a29b99b33 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Sep 24 22:11:25 2025 +0800 Fix the OOM when Merging Trace (#787) --- banyand/trace/block_metadata.go | 28 +++++++--------------- banyand/trace/block_metadata_test.go | 12 +++++----- banyand/trace/block_writer.go | 6 ++--- banyand/trace/merger.go | 17 ++++--------- banyand/trace/part.go | 5 ---- banyand/trace/part_iter.go | 5 ++-- banyand/trace/primary_metadata.go | 28 ++++++++-------------- banyand/trace/syncer.go | 14 +++++++++++ .../distributed/query/query_suite_test.go | 5 ---- 9 files changed, 48 insertions(+), 72 deletions(-) diff --git a/banyand/trace/block_metadata.go b/banyand/trace/block_metadata.go index 02402671..605462e5 100644 --- a/banyand/trace/block_metadata.go +++ b/banyand/trace/block_metadata.go @@ -18,14 +18,11 @@ package trace import ( - "bytes" "fmt" "sort" - "strings" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" - "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -136,12 +133,8 @@ func (bm *blockMetadata) reset() { } } -func (bm *blockMetadata) marshal(dst []byte, traceIDLen uint32) []byte { - dst = append(dst, bm.traceID...) - paddingLen := traceIDLen - uint32(len(bm.traceID)) - if paddingLen > 0 { - dst = append(dst, bytes.Repeat([]byte{0}, int(paddingLen))...) - } +func (bm *blockMetadata) marshal(dst []byte) []byte { + dst = encoding.EncodeBytes(dst, convert.StringToBytes(bm.traceID)) dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSpanSizeBytes) dst = encoding.VarUint64ToBytes(dst, bm.count) dst = bm.spans.marshal(dst) @@ -160,16 +153,13 @@ func (bm *blockMetadata) marshal(dst []byte, traceIDLen uint32) []byte { return dst } -func (bm *blockMetadata) unmarshal(src []byte, tagType map[string]pbv1.ValueType, traceIDLen int) ([]byte, error) { - if len(src) < traceIDLen { - return nil, fmt.Errorf("cannot unmarshal blockMetadata from less than %d bytes", traceIDLen) - } - bm.traceID = strings.TrimRight(string(src[:traceIDLen]), "\x00") - if len(tagType) == 0 { - logger.GetLogger().Error().Msg("tagType is empty") +func (bm *blockMetadata) unmarshal(src []byte, tagType map[string]pbv1.ValueType) ([]byte, error) { + src, traceIDBytes, err := encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal traceID: %w", err) } + bm.traceID = string(traceIDBytes) bm.tagType = tagType - src = src[traceIDLen:] src, n := encoding.BytesToVarUint64(src) bm.uncompressedSpanSizeBytes = n src, n = encoding.BytesToVarUint64(src) @@ -259,7 +249,7 @@ func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) { tm.max = src.max } -func unmarshalBlockMetadata(dst []blockMetadata, src []byte, tagType map[string]pbv1.ValueType, traceIDLen int) ([]blockMetadata, error) { +func unmarshalBlockMetadata(dst []blockMetadata, src []byte, tagType map[string]pbv1.ValueType) ([]blockMetadata, error) { dstOrig := dst var pre *blockMetadata for len(src) > 0 { @@ -269,7 +259,7 @@ func unmarshalBlockMetadata(dst []blockMetadata, src []byte, tagType map[string] dst = append(dst, blockMetadata{}) } bm := &dst[len(dst)-1] - tail, err := bm.unmarshal(src, tagType, traceIDLen) + tail, err := bm.unmarshal(src, tagType) if err != nil { return dstOrig, fmt.Errorf("cannot unmarshal blockMetadata entries: %w", err) } diff --git a/banyand/trace/block_metadata_test.go b/banyand/trace/block_metadata_test.go index 3084edf1..b6f60866 100644 --- a/banyand/trace/block_metadata_test.go +++ b/banyand/trace/block_metadata_test.go @@ -168,13 +168,13 @@ func Test_blockMetadata_marshal_unmarshal(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - marshaled := tc.original.marshal(nil, 6) + marshaled := tc.original.marshal(nil) unmarshaled := blockMetadata{ tags: make(map[string]*dataBlock), } - _, err := unmarshaled.unmarshal(marshaled, nil, 6) + _, err := unmarshaled.unmarshal(marshaled, nil) require.NoError(t, err) assert.Equal(t, tc.original.traceID, unmarshaled.traceID) @@ -242,11 +242,11 @@ func Test_unmarshalBlockMetadata(t *testing.T) { var marshaled []byte for _, bm := range original { - marshaled = bm.marshal(marshaled, 6) + marshaled = bm.marshal(marshaled) } tagType := make(map[string]pbv1.ValueType) - unmarshaled, err := unmarshalBlockMetadata(nil, marshaled, tagType, 6) + unmarshaled, err := unmarshalBlockMetadata(nil, marshaled, tagType) require.NoError(t, err) require.Equal(t, wanted, unmarshaled) }) @@ -277,11 +277,11 @@ func Test_unmarshalBlockMetadata(t *testing.T) { var marshaled []byte for _, bm := range original { - marshaled = bm.marshal(marshaled, 6) + marshaled = bm.marshal(marshaled) } tagType := make(map[string]pbv1.ValueType) - _, err := unmarshalBlockMetadata(nil, marshaled, tagType, 6) + _, err := unmarshalBlockMetadata(nil, marshaled, tagType) require.Error(t, err) }) } diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go index 6d896bfd..88b1c858 100644 --- a/banyand/trace/block_writer.go +++ b/banyand/trace/block_writer.go @@ -148,12 +148,10 @@ type blockWriter struct { totalMinTimestamp int64 totalMaxTimestamp int64 minTimestampLast int64 - traceIDLen uint32 } func (bw *blockWriter) reset() { bw.writers.reset() - bw.traceIDLen = 0 bw.traceIDs = bw.traceIDs[:0] if bw.tagType == nil { bw.tagType = make(tagType) @@ -249,7 +247,7 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) { bw.totalCount += bm.count bw.totalBlocksCount++ - bw.primaryBlockData = bm.marshal(bw.primaryBlockData, bw.traceIDLen) + bw.primaryBlockData = bm.marshal(bw.primaryBlockData) releaseBlockMetadata(bm) if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize { bw.mustFlushPrimaryBlock(bw.primaryBlockData) @@ -259,7 +257,7 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) { func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) { if len(data) > 0 { - bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDLen, bw.traceIDs[0], &bw.writers) + bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDs[0], &bw.writers) bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData) } bw.minTimestamp = 0 diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go index d96fad6f..a19400b8 100644 --- a/banyand/trace/merger.go +++ b/banyand/trace/merger.go @@ -259,13 +259,6 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{} br.init(pii) bw := generateBlockWriter() bw.mustInitForFilePart(fileSystem, dstPath, shouldCache) - for _, pw := range parts { - for _, pbm := range pw.p.primaryBlockMetadata { - if len(pbm.traceID) > int(bw.traceIDLen) { - bw.traceIDLen = uint32(len(pbm.traceID)) - } - } - } var minTimestamp, maxTimestamp int64 for i, pw := range parts { @@ -284,16 +277,16 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{} } pm, tf, tt, err := mergeBlocks(closeCh, bw, br) - if err != nil { - return nil, err - } - pm.MinTimestamp = minTimestamp - pm.MaxTimestamp = maxTimestamp releaseBlockWriter(bw) releaseBlockReader(br) for i := range pii { releasePartMergeIter(pii[i]) } + if err != nil { + return nil, err + } + pm.MinTimestamp = minTimestamp + pm.MaxTimestamp = maxTimestamp pm.mustWriteMetadata(fileSystem, dstPath) tf.mustWriteTraceIDFilter(fileSystem, dstPath) tt.mustWriteTagType(fileSystem, dstPath) diff --git a/banyand/trace/part.go b/banyand/trace/part.go index b76caab9..213a15ff 100644 --- a/banyand/trace/part.go +++ b/banyand/trace/part.go @@ -446,11 +446,6 @@ func (mp *memPart) mustInitFromTraces(ts *traces) { bsw := generateBlockWriter() bsw.MustInitForMemPart(mp) - for _, tid := range ts.traceIDs { - if len(tid) > int(bsw.traceIDLen) { - bsw.traceIDLen = uint32(len(tid)) - } - } var tidPrev string uncompressedSpansSizeBytes := uint64(0) diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go index f6f069a1..9c584b0a 100644 --- a/banyand/trace/part_iter.go +++ b/banyand/trace/part_iter.go @@ -214,7 +214,7 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr *primaryBlockMetada if err != nil { return nil, fmt.Errorf("cannot decompress index block: %w", err) } - bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf, pi.p.tagType, int(mr.traceIDLen)) + bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf, pi.p.tagType) if err != nil { return nil, fmt.Errorf("cannot unmarshal index block: %w", err) } @@ -326,8 +326,7 @@ func (pmi *partMergeIter) loadPrimaryBuf() error { func (pmi *partMergeIter) loadBlockMetadata() error { pmi.block.reset() var err error - traceIDLen := pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1].traceIDLen - pmi.primaryBuf, err = pmi.block.bm.unmarshal(pmi.primaryBuf, pmi.tagType, int(traceIDLen)) + pmi.primaryBuf, err = pmi.block.bm.unmarshal(pmi.primaryBuf, pmi.tagType) if err != nil { pm := pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1] return fmt.Errorf("can't read block metadata from primary at %d: %w", pm.offset, err) diff --git a/banyand/trace/primary_metadata.go b/banyand/trace/primary_metadata.go index 5e7b16a5..304ae833 100644 --- a/banyand/trace/primary_metadata.go +++ b/banyand/trace/primary_metadata.go @@ -18,12 +18,11 @@ package trace import ( - "bytes" "fmt" "io" - "strings" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -32,19 +31,16 @@ import ( type primaryBlockMetadata struct { traceID string dataBlock - traceIDLen uint32 } // reset resets pbm for subsequent re-use. func (pbm *primaryBlockMetadata) reset() { - pbm.traceIDLen = 0 pbm.traceID = "" pbm.offset = 0 pbm.size = 0 } -func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, traceIDLen uint32, traceID string, sw *writers) { - pbm.traceIDLen = traceIDLen +func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, traceID string, sw *writers) { pbm.traceID = traceID bb := bigValuePool.Generate() @@ -56,25 +52,21 @@ func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, traceIDLen uint32, } func (pbm *primaryBlockMetadata) marshal(dst []byte) []byte { - dst = encoding.Uint32ToBytes(dst, pbm.traceIDLen) - dst = append(dst, pbm.traceID...) - paddingLen := pbm.traceIDLen - uint32(len(pbm.traceID)) - if paddingLen > 0 { - dst = append(dst, bytes.Repeat([]byte{0}, int(paddingLen))...) - } + dst = encoding.EncodeBytes(dst, convert.StringToBytes(pbm.traceID)) dst = encoding.Uint64ToBytes(dst, pbm.offset) dst = encoding.Uint64ToBytes(dst, pbm.size) return dst } func (pbm *primaryBlockMetadata) unmarshal(src []byte) ([]byte, error) { - pbm.traceIDLen = encoding.BytesToUint32(src) - src = src[4:] - if len(src) < int(16+pbm.traceIDLen) { - return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata from %d bytes; expect at least %d bytes", len(src), 32+pbm.traceIDLen) + if len(src) < 4 { + return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata from %d bytes; expect at least 4 bytes for traceID length", len(src)) + } + src, traceIDBytes, err := encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal traceID: %w", err) } - pbm.traceID = strings.TrimRight(string(src[:pbm.traceIDLen]), "\x00") - src = src[pbm.traceIDLen:] + pbm.traceID = string(traceIDBytes) pbm.offset = encoding.BytesToUint64(src) src = src[8:] pbm.size = encoding.BytesToUint64(src) diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go index a1ef98e0..ba95c470 100644 --- a/banyand/trace/syncer.go +++ b/banyand/trace/syncer.go @@ -189,6 +189,20 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan *syncIntrodu if err != nil { return err } + if len(partsToSync) == 0 && len(sidxPartsToSync) == 0 { + return nil + } + hasSidxParts := false + for _, sidxParts := range sidxPartsToSync { + if len(sidxParts) == 0 { + continue + } + hasSidxParts = true + break + } + if len(partsToSync) == 0 && !hasSidxParts { + return nil + } // Validate sync preconditions if err := tst.validateSyncPreconditions(partsToSync, sidxPartsToSync); err != nil { diff --git a/test/integration/distributed/query/query_suite_test.go b/test/integration/distributed/query/query_suite_test.go index 297445ba..1f67007d 100644 --- a/test/integration/distributed/query/query_suite_test.go +++ b/test/integration/distributed/query/query_suite_test.go @@ -49,7 +49,6 @@ import ( casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" - casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" ) func TestQuery(t *testing.T) { @@ -131,10 +130,6 @@ var _ = SynchronizedBeforeSuite(func() []byte { Connection: connection, BaseTime: now, } - casestrace.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: now, - } Expect(err).NotTo(HaveOccurred()) })