This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/trace-merge
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 2dde7da2c04608fcd3fa005e9c11ad6799086c61
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Sep 8 04:51:56 2025 +0000

    refactor: simplify SIDX interface and improve part handling
    
    - Updated SIDX and Merger interfaces to remove unnecessary parameters, 
enhancing clarity and usability.
    - Refactored related implementations and tests to align with the new 
interface signatures.
    - Introduced partWrapper for better part management in the 
flusherIntroduction structure.
    - Improved validation logic in WriteRequest to streamline error handling.
---
 banyand/internal/sidx/interfaces.go            |  7 ++-----
 banyand/internal/sidx/introducer.go            |  4 ++--
 banyand/internal/sidx/introducer_test.go       | 12 ++++++------
 banyand/internal/sidx/merge.go                 | 11 +++++++----
 banyand/internal/sidx/multi_sidx_query_test.go |  4 ++--
 banyand/internal/sidx/part_wrapper.go          |  2 +-
 banyand/internal/sidx/sidx.go                  | 11 +++++++++--
 banyand/internal/sidx/sidx_test.go             | 26 +++++++++++++-------------
 banyand/internal/sidx/snapshot.go              | 25 +++++++++++--------------
 banyand/internal/sidx/snapshot_test.go         | 10 +++++-----
 banyand/trace/block_metadata.go                |  3 +++
 banyand/trace/block_reader.go                  |  2 +-
 banyand/trace/merger.go                        |  6 +-----
 banyand/trace/svc_standalone.go                |  2 ++
 banyand/trace/write_standalone.go              |  2 +-
 15 files changed, 66 insertions(+), 61 deletions(-)

diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index d02708a2..c130e04a 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -37,7 +37,7 @@ import (
 type SIDX interface {
        // Write performs batch write operations. All writes must be submitted 
as batches.
        // Elements within each batch should be pre-sorted by the caller for 
optimal performance.
-       Write(ctx context.Context, reqs []WriteRequest, partID uint64) error
+       Write(ctx context.Context, reqs []WriteRequest) error
 
        // Query executes a query with key range and tag filtering.
        // Returns a QueryResponse directly with all results loaded.
@@ -76,7 +76,7 @@ type Merger interface {
        // and coordinate with the introducer loop for snapshot updates.
        // This operation is user-controlled and synchronous.
        // Returns error if merge operation fails.
-       Merge(partIDs []uint64, newPartID uint64, closeCh <-chan struct{}) error
+       Merge(closeCh <-chan struct{}) error
 }
 
 // Writer handles write path operations for batch processing.
@@ -452,9 +452,6 @@ func (wr WriteRequest) Validate() error {
                if tag.Name == "" {
                        return fmt.Errorf("tag[%d] name cannot be empty", i)
                }
-               if len(tag.Value) == 0 {
-                       return fmt.Errorf("tag[%d] value cannot be empty", i)
-               }
        }
        return nil
 }
diff --git a/banyand/internal/sidx/introducer.go 
b/banyand/internal/sidx/introducer.go
index c75700ae..b3801a65 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -49,7 +49,7 @@ func releaseIntroduction(i *introduction) {
 }
 
 type flusherIntroduction struct {
-       flushed map[uint64]*part
+       flushed map[uint64]*partWrapper
        applied chan struct{}
 }
 
@@ -66,7 +66,7 @@ func generateFlusherIntroduction() *flusherIntroduction {
        v := flusherIntroductionPool.Get()
        if v == nil {
                return &flusherIntroduction{
-                       flushed: make(map[uint64]*part),
+                       flushed: make(map[uint64]*partWrapper),
                }
        }
        fi := v
diff --git a/banyand/internal/sidx/introducer_test.go 
b/banyand/internal/sidx/introducer_test.go
index 14b92d12..f601e995 100644
--- a/banyand/internal/sidx/introducer_test.go
+++ b/banyand/internal/sidx/introducer_test.go
@@ -64,7 +64,7 @@ func TestIntroductionPooling(t *testing.T) {
                var intros []*flusherIntroduction
                for i := 0; i < 10; i++ {
                        intro := generateFlusherIntroduction()
-                       intro.flushed[uint64(i)] = &part{}
+                       intro.flushed[uint64(i)] = &partWrapper{}
                        intro.applied = make(chan struct{})
                        intros = append(intros, intro)
                }
@@ -121,8 +121,8 @@ func TestIntroductionReset(t *testing.T) {
                intro := generateFlusherIntroduction()
 
                // Set up flusher introduction with data
-               intro.flushed[1] = &part{}
-               intro.flushed[2] = &part{}
+               intro.flushed[1] = &partWrapper{}
+               intro.flushed[2] = &partWrapper{}
                intro.applied = make(chan struct{})
 
                // Reset the flusher introduction
@@ -308,7 +308,7 @@ func TestConcurrentPoolAccess(t *testing.T) {
                                defer wg.Done()
                                for j := 0; j < operationsPerGoroutine; j++ {
                                        intro := generateFlusherIntroduction()
-                                       intro.flushed[uint64(j)] = &part{}
+                                       intro.flushed[uint64(j)] = 
&partWrapper{}
                                        intro.applied = make(chan struct{})
                                        releaseFlusherIntroduction(intro)
                                }
@@ -347,8 +347,8 @@ func TestIntroductionMapOperations(t *testing.T) {
                intro := generateFlusherIntroduction()
 
                // Add parts to flushed map
-               part1 := &part{}
-               part2 := &part{}
+               part1 := &partWrapper{}
+               part2 := &partWrapper{}
                intro.flushed[1] = part1
                intro.flushed[2] = part2
 
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index d1d3c608..a8d2733a 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -19,7 +19,7 @@ package sidx
 
 import (
        "fmt"
-       "slices"
+       "sync/atomic"
 
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -31,7 +31,7 @@ var (
 )
 
 // Merge implements Merger interface.
-func (s *sidx) Merge(partIDs []uint64, newPartID uint64, closeCh <-chan 
struct{}) error {
+func (s *sidx) Merge(closeCh <-chan struct{}) error {
        // Get current snapshot
        snap := s.currentSnapshot()
        if snap == nil {
@@ -44,10 +44,10 @@ func (s *sidx) Merge(partIDs []uint64, newPartID uint64, 
closeCh <-chan struct{}
        defer releaseMergerIntroduction(mergeIntro)
        mergeIntro.applied = make(chan struct{})
 
-       // Select parts to merge
+       // Select parts to merge (all active non-memory parts)
        var partsToMerge []*partWrapper
        for _, pw := range snap.parts {
-               if pw.isActive() && !pw.isMemPart() && slices.Contains(partIDs, 
pw.ID()) {
+               if pw.isActive() && !pw.isMemPart() {
                        partsToMerge = append(partsToMerge, pw)
                }
        }
@@ -61,6 +61,9 @@ func (s *sidx) Merge(partIDs []uint64, newPartID uint64, 
closeCh <-chan struct{}
                mergeIntro.merged[pw.ID()] = struct{}{}
        }
 
+       // Generate new part ID using atomic increment
+       newPartID := atomic.AddUint64(&s.curPartID, 1)
+
        // Create new merged part
        newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, 
newPartID, s.root)
        if err != nil {
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go 
b/banyand/internal/sidx/multi_sidx_query_test.go
index 2ff19b9c..e33ad7af 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -37,7 +37,7 @@ type mockSIDX struct {
        delay    bool
 }
 
-func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest, _ uint64) error {
+func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest) error {
        return nil // Not implemented for tests
 }
 
@@ -60,7 +60,7 @@ func (m *mockSIDX) Flush() error {
        return nil
 }
 
-func (m *mockSIDX) Merge(_ []uint64, _ uint64, _ <-chan struct{}) error {
+func (m *mockSIDX) Merge(_ <-chan struct{}) error {
        return nil
 }
 
diff --git a/banyand/internal/sidx/part_wrapper.go 
b/banyand/internal/sidx/part_wrapper.go
index 7a134a6d..75c2f503 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -217,7 +217,7 @@ func (pw *partWrapper) isRemoved() bool {
 // isMemPart returns true if this wrapper contains a memory part.
 func (pw *partWrapper) isMemPart() bool {
        // A memory part typically has no file system path or is stored in 
memory
-       return pw.mp != nil || (pw.p != nil && pw.p.path == "")
+       return pw.mp != nil
 }
 
 // String returns a string representation of the partWrapper.
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 7507e531..4515d983 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -60,6 +60,7 @@ type sidx struct {
        totalIntroduceLoopFinished atomic.Int64
        totalQueries               atomic.Int64
        totalWrites                atomic.Int64
+       curPartID                  uint64
        mu                         sync.RWMutex
 }
 
@@ -98,7 +99,7 @@ func NewSIDX(fileSystem fs.FileSystem, opts *Options) (SIDX, 
error) {
 }
 
 // Write implements SIDX interface.
-func (s *sidx) Write(ctx context.Context, reqs []WriteRequest, partID uint64) 
error {
+func (s *sidx) Write(ctx context.Context, reqs []WriteRequest) error {
        // Validate requests
        for _, req := range reqs {
                if err := req.Validate(); err != nil {
@@ -121,6 +122,9 @@ func (s *sidx) Write(ctx context.Context, reqs 
[]WriteRequest, partID uint64) er
        mp := generateMemPart()
        mp.mustInitFromElements(es)
 
+       // Generate part ID using atomic increment
+       partID := atomic.AddUint64(&s.curPartID, 1)
+
        // Create introduction
        intro := generateIntroduction()
        intro.memPart = mp
@@ -418,7 +422,7 @@ func (s *sidx) Flush() error {
                partPath := partPath(s.root, pw.ID())
                pw.mp.mustFlush(s.fileSystem, partPath)
                p := mustOpenPart(partPath, s.fileSystem)
-               flushIntro.flushed[p.partMetadata.ID] = p
+               flushIntro.flushed[p.partMetadata.ID] = pw
        }
 
        if len(flushIntro.flushed) == 0 {
@@ -826,6 +830,9 @@ func (s *sidx) loadSnapshot(epoch uint64, loadedParts 
[]uint64) {
                part.partMetadata.ID = id
                pw := newPartWrapper(nil, part)
                snp.addPart(pw)
+               if s.curPartID < id {
+                       s.curPartID = id
+               }
        }
        s.gc.registerSnapshot(snp)
        s.gc.clean()
diff --git a/banyand/internal/sidx/sidx_test.go 
b/banyand/internal/sidx/sidx_test.go
index 3dfd1f00..47d82f6e 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -100,7 +100,7 @@ func TestSIDX_Write_SingleRequest(t *testing.T) {
                createTestWriteRequest(1, 100, "data1", createTestTag("tag1", 
"value1")),
        }
 
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        assert.NoError(t, err)
 
        // Verify stats
@@ -124,7 +124,7 @@ func TestSIDX_Write_BatchRequest(t *testing.T) {
                createTestWriteRequest(2, 200, "data3", createTestTag("tag2", 
"value3")),
        }
 
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        assert.NoError(t, err)
 
        // Verify stats
@@ -170,7 +170,7 @@ func TestSIDX_Write_Validation(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       err := sidx.Write(ctx, []WriteRequest{tt.req}, 
partIDForTesting)
+                       err := sidx.Write(ctx, []WriteRequest{tt.req})
                        if tt.expectErr {
                                assert.Error(t, err)
                        } else {
@@ -199,7 +199,7 @@ func TestSIDX_Write_WithTags(t *testing.T) {
                createTestWriteRequest(1, 100, "trace-data", tags...),
        }
 
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        assert.NoError(t, err)
 }
 
@@ -218,7 +218,7 @@ func TestSIDX_Query_BasicQuery(t *testing.T) {
                createTestWriteRequest(1, 100, "data1"),
                createTestWriteRequest(1, 101, "data2"),
        }
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        require.NoError(t, err)
 
        // Wait for introducer loop to process
@@ -268,7 +268,7 @@ func TestSIDX_Query_KeyRangeFilter(t *testing.T) {
                createTestWriteRequest(1, 150, "data150"),
                createTestWriteRequest(1, 200, "data200"),
        }
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        require.NoError(t, err)
 
        // Wait for introducer loop to process
@@ -322,7 +322,7 @@ func TestSIDX_Query_Ordering(t *testing.T) {
                createTestWriteRequest(3, 75, "series3-data75"),
                createTestWriteRequest(3, 175, "series3-data175"),
        }
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        require.NoError(t, err)
 
        // Wait for introducer loop to process
@@ -479,7 +479,7 @@ func TestSIDX_WriteQueryIntegration(t *testing.T) {
                createTestWriteRequest(2, 180, "series2-data2", 
createTestTag("env", "dev")),
        }
 
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        require.NoError(t, err)
 
        // Test 1: Query single series
@@ -540,7 +540,7 @@ func TestSIDX_DataConsistency(t *testing.T) {
                reqs = append(reqs, createTestWriteRequest(1, key, data))
        }
 
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        require.NoError(t, err)
 
        // Query back and verify data integrity
@@ -587,7 +587,7 @@ func TestSIDX_LargeDataset(t *testing.T) {
                ))
        }
 
-       err := sidx.Write(ctx, reqs, partIDForTesting)
+       err := sidx.Write(ctx, reqs)
        require.NoError(t, err)
 
        // Query back and verify we can handle large result sets
@@ -634,7 +634,7 @@ func TestSIDX_ConcurrentWrites(t *testing.T) {
                                reqs = append(reqs, 
createTestWriteRequest(seriesID, key, data))
                        }
 
-                       if err := sidx.Write(ctx, reqs, partIDForTesting); err 
!= nil {
+                       if err := sidx.Write(ctx, reqs); err != nil {
                                errors <- err
                        }
                }(g)
@@ -666,7 +666,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) {
        initialReqs := []WriteRequest{
                createTestWriteRequest(1, 100, "initial-data"),
        }
-       err := sidx.Write(ctx, initialReqs, partIDForTesting)
+       err := sidx.Write(ctx, initialReqs)
        require.NoError(t, err)
 
        var wg sync.WaitGroup
@@ -708,7 +708,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) {
                                        int64(writeCount),
                                        fmt.Sprintf("writer-%d-data-%d", 
writerID, writeCount),
                                )
-                               sidx.Write(ctx, []WriteRequest{req}, 
partIDForTesting) // Ignore errors during concurrent stress
+                               sidx.Write(ctx, []WriteRequest{req}) // Ignore 
errors during concurrent stress
                                writeCount++
                        }
                }(i)
diff --git a/banyand/internal/sidx/snapshot.go 
b/banyand/internal/sidx/snapshot.go
index 2edfd3b9..e06fcc41 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -370,22 +370,19 @@ func (s *snapshot) copyAllTo(epoch uint64) *snapshot {
 }
 
 // merge creates a new snapshot by merging flushed parts into the current 
snapshot.
-func (s *snapshot) merge(epoch uint64, flushed map[uint64]*part) *snapshot {
-       result := s.copyAllTo(epoch)
-
-       // Add flushed parts to the snapshot
-       for partID, part := range flushed {
-               // Set the part ID from the map key
-               if part != nil && part.partMetadata != nil {
-                       part.partMetadata.ID = partID
+func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) 
*snapshot {
+       var result snapshot
+       result.epoch = nextEpoch
+       result.ref = 1
+       for i := 0; i < len(s.parts); i++ {
+               if n, ok := nextParts[s.parts[i].ID()]; ok {
+                       result.parts = append(result.parts, n)
+                       continue
                }
-               // Create part wrapper for the flushed part
-               pw := newPartWrapper(nil, part)
-               result.parts = append(result.parts, pw)
+               s.parts[i].acquire()
+               result.parts = append(result.parts, s.parts[i])
        }
-
-       result.sortPartsByEpoch()
-       return result
+       return &result
 }
 
 // remove creates a new snapshot by removing specified parts.
diff --git a/banyand/internal/sidx/snapshot_test.go 
b/banyand/internal/sidx/snapshot_test.go
index dc94d291..9029edaf 100644
--- a/banyand/internal/sidx/snapshot_test.go
+++ b/banyand/internal/sidx/snapshot_test.go
@@ -430,7 +430,7 @@ func TestSnapshotReplacement_Basic(t *testing.T) {
                        Tags:     []Tag{{Name: "test", Value: 
[]byte("snapshot-replacement")}},
                }
 
-               if err := sidx.Write(ctx, []WriteRequest{req}, 
partIDForTesting); err != nil {
+               if err := sidx.Write(ctx, []WriteRequest{req}); err != nil {
                        t.Errorf("write %d failed: %v", i, err)
                }
 
@@ -521,7 +521,7 @@ func 
TestSnapshotReplacement_ConcurrentReadsConsistentData(t *testing.T) {
                        },
                }
 
-               if err := sidx.Write(ctx, reqs, partIDForTesting); err != nil {
+               if err := sidx.Write(ctx, reqs); err != nil {
                        t.Errorf("write %d failed: %v", i, err)
                }
 
@@ -610,7 +610,7 @@ func TestSnapshotReplacement_NoDataRacesDuringReplacement(t 
*testing.T) {
                                                        },
                                                },
                                        }
-                                       sidx.Write(ctx, reqs, partIDForTesting)
+                                       sidx.Write(ctx, reqs)
                                case 1:
                                        // Stats operation - accesses current 
snapshot
                                        sidx.Stats(ctx)
@@ -670,7 +670,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t 
*testing.T) {
                                },
                        }
 
-                       if writeErr := sidx.Write(ctx, reqs, partIDForTesting); 
writeErr != nil {
+                       if writeErr := sidx.Write(ctx, reqs); writeErr != nil {
                                t.Errorf("batch %d write %d failed: %v", i, j, 
writeErr)
                        }
                }
@@ -731,7 +731,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t 
*testing.T) {
                                        },
                                }
 
-                               if writeErr := sidx.Write(ctx, reqs, 
partIDForTesting); writeErr != nil {
+                               if writeErr := sidx.Write(ctx, reqs); writeErr 
!= nil {
                                        t.Errorf("concurrent writer %d write %d 
failed: %v", writerID, j, writeErr)
                                }
 
diff --git a/banyand/trace/block_metadata.go b/banyand/trace/block_metadata.go
index 3fbd0f22..eb0275b1 100644
--- a/banyand/trace/block_metadata.go
+++ b/banyand/trace/block_metadata.go
@@ -164,6 +164,9 @@ func (bm *blockMetadata) unmarshal(src []byte, tagType 
map[string]pbv1.ValueType
                return nil, fmt.Errorf("cannot unmarshal blockMetadata from 
less than %d bytes", traceIDLen)
        }
        bm.traceID = strings.TrimRight(string(src[:traceIDLen]), "\x00")
+       if len(tagType) == 0 {
+               fmt.Println("tagType is empty")
+       }
        bm.tagType = tagType
        src = src[traceIDLen:]
        src, n := encoding.BytesToVarUint64(src)
diff --git a/banyand/trace/block_reader.go b/banyand/trace/block_reader.go
index def4fe09..ad1ef4c1 100644
--- a/banyand/trace/block_reader.go
+++ b/banyand/trace/block_reader.go
@@ -188,9 +188,9 @@ func (br *blockReader) nextMetadata() error {
        for key, tv := range br.block.bm.tagType {
                tagType[key] = tv
        }
+       head.tagType = tagType
        if head.nextBlockMetadata() {
                heap.Fix(&br.pih, 0)
-               br.pih[0].block.bm.tagType = tagType
                br.block = &br.pih[0].block
                return nil
        }
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 095268cd..d6ba9411 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -112,12 +112,8 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator 
snapshotCreator, part
        if err != nil {
                return nil, err
        }
-       partIDs := make([]uint64, 0, len(parts))
-       for _, pw := range parts {
-               partIDs = append(partIDs, pw.ID())
-       }
        for sidxName, sidxInstance := range tst.sidxMap {
-               if err := sidxInstance.Merge(partIDs, newPartID, closeCh); err 
!= nil {
+               if err := sidxInstance.Merge(closeCh); err != nil {
                        tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx 
merge failed")
                        return nil, err
                }
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 05e4e858..950c3b3d 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -73,6 +73,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
        fs.DurationVar(&s.option.flushTimeout, "trace-flush-timeout", 
defaultFlushTimeout, "the timeout for trace data flush")
        fs.IntVar(&s.maxDiskUsagePercent, "trace-max-disk-usage-percent", 95, 
"the maximum disk usage percentage")
        fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 2, "the 
maximum number of file snapshots")
+       s.option.mergePolicy = newDefaultMergePolicy()
+       fs.VarP(&s.option.mergePolicy.maxFanOutSize, "trace-max-fan-out-size", 
"", "the upper bound of a single file size after merge of trace")
        // Additional flags can be added here
        return fs
 }
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index b2abd73b..c381eb72 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -362,7 +362,7 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx 
instance")
                                                continue
                                        }
-                                       if err := sidxInstance.Write(ctx, 
sidxReqs, es.tsTable.curPartID); err != nil {
+                                       if err := sidxInstance.Write(ctx, 
sidxReqs); err != nil {
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary 
index")
                                        }
                                }

Reply via email to