This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c43573c5 Fix the OOM when Merging Trace (#787)
c43573c5 is described below

commit c43573c59155bf1d2d4982135526d36a29b99b33
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Sep 24 22:11:25 2025 +0800

    Fix the OOM when Merging Trace (#787)
---
 banyand/trace/block_metadata.go                    | 28 +++++++---------------
 banyand/trace/block_metadata_test.go               | 12 +++++-----
 banyand/trace/block_writer.go                      |  6 ++---
 banyand/trace/merger.go                            | 17 ++++---------
 banyand/trace/part.go                              |  5 ----
 banyand/trace/part_iter.go                         |  5 ++--
 banyand/trace/primary_metadata.go                  | 28 ++++++++--------------
 banyand/trace/syncer.go                            | 14 +++++++++++
 .../distributed/query/query_suite_test.go          |  5 ----
 9 files changed, 48 insertions(+), 72 deletions(-)

diff --git a/banyand/trace/block_metadata.go b/banyand/trace/block_metadata.go
index 02402671..605462e5 100644
--- a/banyand/trace/block_metadata.go
+++ b/banyand/trace/block_metadata.go
@@ -18,14 +18,11 @@
 package trace
 
 import (
-       "bytes"
        "fmt"
        "sort"
-       "strings"
 
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -136,12 +133,8 @@ func (bm *blockMetadata) reset() {
        }
 }
 
-func (bm *blockMetadata) marshal(dst []byte, traceIDLen uint32) []byte {
-       dst = append(dst, bm.traceID...)
-       paddingLen := traceIDLen - uint32(len(bm.traceID))
-       if paddingLen > 0 {
-               dst = append(dst, bytes.Repeat([]byte{0}, int(paddingLen))...)
-       }
+func (bm *blockMetadata) marshal(dst []byte) []byte {
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(bm.traceID))
        dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSpanSizeBytes)
        dst = encoding.VarUint64ToBytes(dst, bm.count)
        dst = bm.spans.marshal(dst)
@@ -160,16 +153,13 @@ func (bm *blockMetadata) marshal(dst []byte, traceIDLen 
uint32) []byte {
        return dst
 }
 
-func (bm *blockMetadata) unmarshal(src []byte, tagType 
map[string]pbv1.ValueType, traceIDLen int) ([]byte, error) {
-       if len(src) < traceIDLen {
-               return nil, fmt.Errorf("cannot unmarshal blockMetadata from 
less than %d bytes", traceIDLen)
-       }
-       bm.traceID = strings.TrimRight(string(src[:traceIDLen]), "\x00")
-       if len(tagType) == 0 {
-               logger.GetLogger().Error().Msg("tagType is empty")
+func (bm *blockMetadata) unmarshal(src []byte, tagType 
map[string]pbv1.ValueType) ([]byte, error) {
+       src, traceIDBytes, err := encoding.DecodeBytes(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
        }
+       bm.traceID = string(traceIDBytes)
        bm.tagType = tagType
-       src = src[traceIDLen:]
        src, n := encoding.BytesToVarUint64(src)
        bm.uncompressedSpanSizeBytes = n
        src, n = encoding.BytesToVarUint64(src)
@@ -259,7 +249,7 @@ func (tm *timestampsMetadata) copyFrom(src 
*timestampsMetadata) {
        tm.max = src.max
 }
 
-func unmarshalBlockMetadata(dst []blockMetadata, src []byte, tagType 
map[string]pbv1.ValueType, traceIDLen int) ([]blockMetadata, error) {
+func unmarshalBlockMetadata(dst []blockMetadata, src []byte, tagType 
map[string]pbv1.ValueType) ([]blockMetadata, error) {
        dstOrig := dst
        var pre *blockMetadata
        for len(src) > 0 {
@@ -269,7 +259,7 @@ func unmarshalBlockMetadata(dst []blockMetadata, src 
[]byte, tagType map[string]
                        dst = append(dst, blockMetadata{})
                }
                bm := &dst[len(dst)-1]
-               tail, err := bm.unmarshal(src, tagType, traceIDLen)
+               tail, err := bm.unmarshal(src, tagType)
                if err != nil {
                        return dstOrig, fmt.Errorf("cannot unmarshal 
blockMetadata entries: %w", err)
                }
diff --git a/banyand/trace/block_metadata_test.go 
b/banyand/trace/block_metadata_test.go
index 3084edf1..b6f60866 100644
--- a/banyand/trace/block_metadata_test.go
+++ b/banyand/trace/block_metadata_test.go
@@ -168,13 +168,13 @@ func Test_blockMetadata_marshal_unmarshal(t *testing.T) {
 
        for _, tc := range testCases {
                t.Run(tc.name, func(t *testing.T) {
-                       marshaled := tc.original.marshal(nil, 6)
+                       marshaled := tc.original.marshal(nil)
 
                        unmarshaled := blockMetadata{
                                tags: make(map[string]*dataBlock),
                        }
 
-                       _, err := unmarshaled.unmarshal(marshaled, nil, 6)
+                       _, err := unmarshaled.unmarshal(marshaled, nil)
                        require.NoError(t, err)
 
                        assert.Equal(t, tc.original.traceID, 
unmarshaled.traceID)
@@ -242,11 +242,11 @@ func Test_unmarshalBlockMetadata(t *testing.T) {
 
                var marshaled []byte
                for _, bm := range original {
-                       marshaled = bm.marshal(marshaled, 6)
+                       marshaled = bm.marshal(marshaled)
                }
 
                tagType := make(map[string]pbv1.ValueType)
-               unmarshaled, err := unmarshalBlockMetadata(nil, marshaled, 
tagType, 6)
+               unmarshaled, err := unmarshalBlockMetadata(nil, marshaled, 
tagType)
                require.NoError(t, err)
                require.Equal(t, wanted, unmarshaled)
        })
@@ -277,11 +277,11 @@ func Test_unmarshalBlockMetadata(t *testing.T) {
 
                var marshaled []byte
                for _, bm := range original {
-                       marshaled = bm.marshal(marshaled, 6)
+                       marshaled = bm.marshal(marshaled)
                }
 
                tagType := make(map[string]pbv1.ValueType)
-               _, err := unmarshalBlockMetadata(nil, marshaled, tagType, 6)
+               _, err := unmarshalBlockMetadata(nil, marshaled, tagType)
                require.Error(t, err)
        })
 }
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 6d896bfd..88b1c858 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -148,12 +148,10 @@ type blockWriter struct {
        totalMinTimestamp              int64
        totalMaxTimestamp              int64
        minTimestampLast               int64
-       traceIDLen                     uint32
 }
 
 func (bw *blockWriter) reset() {
        bw.writers.reset()
-       bw.traceIDLen = 0
        bw.traceIDs = bw.traceIDs[:0]
        if bw.tagType == nil {
                bw.tagType = make(tagType)
@@ -249,7 +247,7 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) 
{
        bw.totalCount += bm.count
        bw.totalBlocksCount++
 
-       bw.primaryBlockData = bm.marshal(bw.primaryBlockData, bw.traceIDLen)
+       bw.primaryBlockData = bm.marshal(bw.primaryBlockData)
        releaseBlockMetadata(bm)
        if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize {
                bw.mustFlushPrimaryBlock(bw.primaryBlockData)
@@ -259,7 +257,7 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) 
{
 
 func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
        if len(data) > 0 {
-               bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDLen, 
bw.traceIDs[0], &bw.writers)
+               bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDs[0], 
&bw.writers)
                bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData)
        }
        bw.minTimestamp = 0
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index d96fad6f..a19400b8 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -259,13 +259,6 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
        br.init(pii)
        bw := generateBlockWriter()
        bw.mustInitForFilePart(fileSystem, dstPath, shouldCache)
-       for _, pw := range parts {
-               for _, pbm := range pw.p.primaryBlockMetadata {
-                       if len(pbm.traceID) > int(bw.traceIDLen) {
-                               bw.traceIDLen = uint32(len(pbm.traceID))
-                       }
-               }
-       }
 
        var minTimestamp, maxTimestamp int64
        for i, pw := range parts {
@@ -284,16 +277,16 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
        }
 
        pm, tf, tt, err := mergeBlocks(closeCh, bw, br)
-       if err != nil {
-               return nil, err
-       }
-       pm.MinTimestamp = minTimestamp
-       pm.MaxTimestamp = maxTimestamp
        releaseBlockWriter(bw)
        releaseBlockReader(br)
        for i := range pii {
                releasePartMergeIter(pii[i])
        }
+       if err != nil {
+               return nil, err
+       }
+       pm.MinTimestamp = minTimestamp
+       pm.MaxTimestamp = maxTimestamp
        pm.mustWriteMetadata(fileSystem, dstPath)
        tf.mustWriteTraceIDFilter(fileSystem, dstPath)
        tt.mustWriteTagType(fileSystem, dstPath)
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index b76caab9..213a15ff 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -446,11 +446,6 @@ func (mp *memPart) mustInitFromTraces(ts *traces) {
 
        bsw := generateBlockWriter()
        bsw.MustInitForMemPart(mp)
-       for _, tid := range ts.traceIDs {
-               if len(tid) > int(bsw.traceIDLen) {
-                       bsw.traceIDLen = uint32(len(tid))
-               }
-       }
 
        var tidPrev string
        uncompressedSpansSizeBytes := uint64(0)
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index f6f069a1..9c584b0a 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -214,7 +214,7 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, 
mr *primaryBlockMetada
        if err != nil {
                return nil, fmt.Errorf("cannot decompress index block: %w", err)
        }
-       bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf, pi.p.tagType, 
int(mr.traceIDLen))
+       bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf, pi.p.tagType)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal index block: %w", err)
        }
@@ -326,8 +326,7 @@ func (pmi *partMergeIter) loadPrimaryBuf() error {
 func (pmi *partMergeIter) loadBlockMetadata() error {
        pmi.block.reset()
        var err error
-       traceIDLen := 
pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1].traceIDLen
-       pmi.primaryBuf, err = pmi.block.bm.unmarshal(pmi.primaryBuf, 
pmi.tagType, int(traceIDLen))
+       pmi.primaryBuf, err = pmi.block.bm.unmarshal(pmi.primaryBuf, 
pmi.tagType)
        if err != nil {
                pm := pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1]
                return fmt.Errorf("can't read block metadata from primary at 
%d: %w", pm.offset, err)
diff --git a/banyand/trace/primary_metadata.go 
b/banyand/trace/primary_metadata.go
index 5e7b16a5..304ae833 100644
--- a/banyand/trace/primary_metadata.go
+++ b/banyand/trace/primary_metadata.go
@@ -18,12 +18,11 @@
 package trace
 
 import (
-       "bytes"
        "fmt"
        "io"
-       "strings"
 
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -32,19 +31,16 @@ import (
 type primaryBlockMetadata struct {
        traceID string
        dataBlock
-       traceIDLen uint32
 }
 
 // reset resets pbm for subsequent re-use.
 func (pbm *primaryBlockMetadata) reset() {
-       pbm.traceIDLen = 0
        pbm.traceID = ""
        pbm.offset = 0
        pbm.size = 0
 }
 
-func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, traceIDLen 
uint32, traceID string, sw *writers) {
-       pbm.traceIDLen = traceIDLen
+func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, traceID string, 
sw *writers) {
        pbm.traceID = traceID
 
        bb := bigValuePool.Generate()
@@ -56,25 +52,21 @@ func (pbm *primaryBlockMetadata) mustWriteBlock(data 
[]byte, traceIDLen uint32,
 }
 
 func (pbm *primaryBlockMetadata) marshal(dst []byte) []byte {
-       dst = encoding.Uint32ToBytes(dst, pbm.traceIDLen)
-       dst = append(dst, pbm.traceID...)
-       paddingLen := pbm.traceIDLen - uint32(len(pbm.traceID))
-       if paddingLen > 0 {
-               dst = append(dst, bytes.Repeat([]byte{0}, int(paddingLen))...)
-       }
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(pbm.traceID))
        dst = encoding.Uint64ToBytes(dst, pbm.offset)
        dst = encoding.Uint64ToBytes(dst, pbm.size)
        return dst
 }
 
 func (pbm *primaryBlockMetadata) unmarshal(src []byte) ([]byte, error) {
-       pbm.traceIDLen = encoding.BytesToUint32(src)
-       src = src[4:]
-       if len(src) < int(16+pbm.traceIDLen) {
-               return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata 
from %d bytes; expect at least %d bytes", len(src), 32+pbm.traceIDLen)
+       if len(src) < 4 {
+               return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata 
from %d bytes; expect at least 4 bytes for traceID length", len(src))
+       }
+       src, traceIDBytes, err := encoding.DecodeBytes(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
        }
-       pbm.traceID = strings.TrimRight(string(src[:pbm.traceIDLen]), "\x00")
-       src = src[pbm.traceIDLen:]
+       pbm.traceID = string(traceIDBytes)
        pbm.offset = encoding.BytesToUint64(src)
        src = src[8:]
        pbm.size = encoding.BytesToUint64(src)
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index a1ef98e0..ba95c470 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -189,6 +189,20 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
        if err != nil {
                return err
        }
+       if len(partsToSync) == 0 && len(sidxPartsToSync) == 0 {
+               return nil
+       }
+       hasSidxParts := false
+       for _, sidxParts := range sidxPartsToSync {
+               if len(sidxParts) == 0 {
+                       continue
+               }
+               hasSidxParts = true
+               break
+       }
+       if len(partsToSync) == 0 && !hasSidxParts {
+               return nil
+       }
 
        // Validate sync preconditions
        if err := tst.validateSyncPreconditions(partsToSync, sidxPartsToSync); 
err != nil {
diff --git a/test/integration/distributed/query/query_suite_test.go 
b/test/integration/distributed/query/query_suite_test.go
index 297445ba..1f67007d 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -49,7 +49,6 @@ import (
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
-       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
 )
 
 func TestQuery(t *testing.T) {
@@ -131,10 +130,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   now,
        }
-       casestrace.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               BaseTime:   now,
-       }
        Expect(err).NotTo(HaveOccurred())
 })
 

Reply via email to