This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 071c0ccd Fix syncSnapshot for trace (#788)
071c0ccd is described below
commit 071c0ccd4a4430b42f55eff4a0c03f66517da25a
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Sep 30 06:53:16 2025 +0800
Fix syncSnapshot for trace (#788)
* Fix syncSnapshot for trace
* Remove the logging-env flag
---
banyand/internal/sidx/sidx.go | 6 ++
banyand/trace/flusher.go | 5 ++
banyand/trace/syncer.go | 70 +++++++++++++++-------
banyand/trace/write_data.go | 2 +
pkg/test/setup/setup.go | 4 ++
.../distributed/query/query_suite_test.go | 5 ++
6 files changed, 70 insertions(+), 22 deletions(-)
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index b2aa29df..5e2efd1f 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -453,6 +453,12 @@ func (s *sidx) Flush() error {
}
partPath := partPath(s.root, pw.ID())
pw.mp.mustFlush(s.fileSystem, partPath)
+ if l := s.l.Debug(); l.Enabled() {
+ s.l.Debug().
+ Uint64("part_id", pw.ID()).
+ Str("part_path", partPath).
+ Msg("flushing sidx part")
+ }
newPW := newPartWrapper(nil, mustOpenPart(partPath,
s.fileSystem))
flushIntro.flushed[newPW.ID()] = newPW
}
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 33eb4d1e..76096b4b 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -269,6 +269,11 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction)
}
allSidx := tst.getAllSidx()
for name, sidxInstance := range allSidx {
+ if l := tst.l.Debug(); l.Enabled() {
+ tst.l.Debug().
+ Str("sidx_name", name).
+ Msg("flushing sidx")
+ }
if err := sidxInstance.Flush(); err != nil {
tst.l.Warn().Err(err).Str("sidx", name).Msg("sidx flush
failed")
return
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 8e98a3e0..b0155363 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -189,24 +189,18 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
if err != nil {
return err
}
- if len(partsToSync) == 0 && len(sidxPartsToSync) == 0 {
- return nil
- }
- hasSidxParts := false
- for _, sidxParts := range sidxPartsToSync {
- if len(sidxParts) == 0 {
- continue
+ if l := tst.l.Debug(); l.Enabled() {
+ for name, sidxParts := range sidxPartsToSync {
+ tst.l.Debug().
+ Str("sidx_name", name).
+ Int("sidx_parts_count", len(sidxParts)).
+ Msg("sidxPartsToSync in syncSnapshot")
}
- hasSidxParts = true
- break
- }
- if len(partsToSync) == 0 && !hasSidxParts {
- return nil
}
// Validate sync preconditions
- if err := tst.validateSyncPreconditions(partsToSync, sidxPartsToSync);
err != nil {
- return err
+ if !tst.needToSync(partsToSync, sidxPartsToSync) {
+ return nil
}
// Execute sync operation
@@ -234,13 +228,46 @@ func (tst *tsTable) collectPartsToSync(curSnapshot
*snapshot) ([]*part, map[stri
return nil, nil, errClosed
}
sidxPartsToSync[name] = sidx.PartsToSync()
+ if l := tst.l.Debug(); l.Enabled() {
+ tst.l.Debug().
+ Str("sidx_name", name).
+ Int("sidx_parts_count",
len(sidxPartsToSync[name])).
+ Msg("get sidx parts to sync")
+ }
+ }
+
+ if l := tst.l.Debug(); l.Enabled() {
+ tst.l.Debug().
+ Int("core_parts_count", len(partsToSync)).
+ Uint64("snapshot_epoch", curSnapshot.epoch).
+ Msg("collected core parts for sync")
+ if len(partsToSync) > 0 {
+ var corePartIDs []uint64
+ for _, part := range partsToSync {
+ corePartIDs = append(corePartIDs,
part.partMetadata.ID)
+ }
+ tst.l.Debug().
+ Interface("core_part_ids", corePartIDs).
+ Msg("core parts to sync details")
+ }
+ for sidxName, sidxParts := range sidxPartsToSync {
+ var sidxPartIDs []uint64
+ for _, part := range sidxParts {
+ sidxPartIDs = append(sidxPartIDs, part.ID())
+ }
+ tst.l.Debug().
+ Str("sidx_name", sidxName).
+ Int("sidx_parts_count", len(sidxParts)).
+ Interface("sidx_part_ids", sidxPartIDs).
+ Msg("collected sidx parts for sync")
+ }
}
return partsToSync, sidxPartsToSync, nil
}
-// validateSyncPreconditions validates that there are parts to sync and nodes
available.
-func (tst *tsTable) validateSyncPreconditions(partsToSync []*part,
sidxPartsToSync map[string][]*sidx.Part) error {
+// needToSync validates that there are parts to sync and nodes available.
+func (tst *tsTable) needToSync(partsToSync []*part, sidxPartsToSync
map[string][]*sidx.Part) bool {
hasCoreParts := len(partsToSync) > 0
hasSidxParts := false
for _, parts := range sidxPartsToSync {
@@ -250,15 +277,11 @@ func (tst *tsTable) validateSyncPreconditions(partsToSync
[]*part, sidxPartsToSy
}
}
if !hasCoreParts && !hasSidxParts {
- return nil
+ return false
}
nodes := tst.getNodes()
- if len(nodes) == 0 {
- return fmt.Errorf("no nodes to sync parts")
- }
-
- return nil
+ return len(nodes) > 0
}
// executeSyncOperation performs the actual synchronization of parts to nodes.
@@ -384,6 +407,9 @@ func (tst *tsTable) syncStreamingPartsToNodes(ctx
context.Context, nodes []strin
streamingParts := make([]queue.StreamingPartData, 0)
// Add sidx streaming parts
for name, sidxParts := range sidxPartsToSync {
+ if len(sidxParts) == 0 {
+ continue
+ }
sidx, ok := tst.getSidx(name)
if !ok {
return fmt.Errorf("sidx %s not found", name)
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index ed3c74dc..f1d8da52 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -83,6 +83,8 @@ func (s *syncPartContext) Close() error {
s.sidxPartContext = nil
}
s.tsTable = nil
+ s.traceIDFilterBuffer = nil
+ s.tagTypeBuffer = nil
return nil
}
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 68972a40..804a8cf6 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -256,6 +256,8 @@ func startDataNode(etcdEndpoint, dataDir string, flags
...string) (string, strin
"--etcd-endpoints", etcdEndpoint,
"--node-host-provider", "flag",
"--node-host", nodeHost,
+ "--logging-modules", "trace,sidx",
+ "--logging-levels", "debug,debug",
)
closeFn := CMD(flags...)
@@ -344,6 +346,8 @@ func LiaisonNodeWithHTTP(etcdEndpoint string, flags
...string) (string, string,
"--stream-sync-interval=1s",
"--measure-sync-interval=1s",
"--trace-sync-interval=1s",
+ "--logging-modules", "trace,sidx",
+ "--logging-levels", "debug,debug",
)
closeFn := CMD(flags...)
gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""),
testflags.EventuallyTimeout).Should(gomega.Succeed())
diff --git a/test/integration/distributed/query/query_suite_test.go
b/test/integration/distributed/query/query_suite_test.go
index 1f67007d..297445ba 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -49,6 +49,7 @@ import (
casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+ casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
)
func TestQuery(t *testing.T) {
@@ -130,6 +131,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Connection: connection,
BaseTime: now,
}
+ casestrace.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: now,
+ }
Expect(err).NotTo(HaveOccurred())
})