Copilot commented on code in PR #767: URL: https://github.com/apache/skywalking-banyandb/pull/767#discussion_r2341718328
########## banyand/trace/write_data.go: ########## @@ -84,17 +112,47 @@ func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (que } tsdb.Tick(ctx.MaxTimestamp) + + if ctx.PartType != PartTypeCore { + s.l.Debug(). + Str("group", ctx.Group). + Uint32("shardID", ctx.ShardID). + Uint64("partID", ctx.ID). + Str("partType", ctx.PartType). + Msg("creating unified part handler for sidx data") + sidxNames := tsTable.getAllSidxNames() + sidxMemParts := sidx.NewSidxMemParts() + for _, sidxName := range sidxNames { + memPart := sidx.GenerateMemPart() + memPart.SetPartMetadata(ctx.CompressedSizeBytes, ctx.UncompressedSizeBytes, ctx.TotalCount, ctx.BlocksCount, ctx.MinKey, ctx.MaxKey, ctx.ID) + writers := sidx.GenerateWriters() + writers.MustInitForMemPart(memPart) + sidxMemParts.Set(sidxName, memPart, writers) + s.l.Debug(). + Str("sidxName", sidxName). + Uint64("partID", ctx.ID). + Msg("pre-created sidx memPart and writers") + } + return &syncPartContext{ + tsTable: tsTable, + l: s.l, + sidxPartContext: sidxMemParts, + }, nil + } Review Comment: [nitpick] This code block for handling non-core part types is complex and could be extracted into a separate method like `createSidxPartHandler` to improve readability and maintainability. ########## banyand/trace/syncer.go: ########## @@ -159,77 +196,86 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan *syncIntrodu release() } }() - - for _, node := range nodes { - // Get chunked sync client for this node - chunkedClient, err := tst.option.tire2Client.NewChunkedSyncClient(node, 1024*1024) - if err != nil { - return fmt.Errorf("failed to create chunked sync client for node %s: %w", node, err) - } - defer chunkedClient.Close() - - // Prepare streaming parts data for chunked sync - var streamingParts []queue.StreamingPartData - for _, part := range partsToSync { - // Create streaming reader for the part - files, release := createPartFileReaders(part) - releaseFuncs = append(releaseFuncs, release) - - // Create streaming part sync data - streamingParts = append(streamingParts, queue.StreamingPartData{ - ID: part.partMetadata.ID, - Group: tst.group, - ShardID: uint32(tst.shardID), - Topic: data.TopicStreamPartSync.String(), - Files: files, - CompressedSizeBytes: part.partMetadata.CompressedSizeBytes, - UncompressedSizeBytes: part.partMetadata.UncompressedSpanSizeBytes, - TotalCount: part.partMetadata.TotalCount, - BlocksCount: part.partMetadata.BlocksCount, - MinTimestamp: part.partMetadata.MinTimestamp, - MaxTimestamp: part.partMetadata.MaxTimestamp, - }) - } - - // Sync parts using chunked transfer with streaming - result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts) - if err != nil { - return fmt.Errorf("failed to sync streaming parts to node %s: %w", node, err) - } - - if !result.Success { - return fmt.Errorf("chunked sync partially failed: %v", result.ErrorMessage) - } - tst.l.Info(). - Str("node", node). - Str("session", result.SessionID). - Uint64("bytes", result.TotalBytes). - Int64("duration_ms", result.DurationMs). - Uint32("chunks", result.ChunksCount). - Uint32("parts", result.PartsCount). - Msg("chunked sync completed successfully") + // Prepare all streaming parts data + streamingParts := make([]queue.StreamingPartData, 0) + // Create streaming parts from core trace parts + for _, part := range partsToSync { + files, release := createPartFileReaders(part) + releaseFuncs = append(releaseFuncs, release) + streamingParts = append(streamingParts, queue.StreamingPartData{ + ID: part.partMetadata.ID, + Group: tst.group, + ShardID: uint32(tst.shardID), + Topic: data.TopicStreamPartSync.String(), + Files: files, + CompressedSizeBytes: part.partMetadata.CompressedSizeBytes, + UncompressedSizeBytes: part.partMetadata.UncompressedSpanSizeBytes, + TotalCount: part.partMetadata.TotalCount, + BlocksCount: part.partMetadata.BlocksCount, + MinTimestamp: part.partMetadata.MinTimestamp, + MaxTimestamp: part.partMetadata.MaxTimestamp, + PartType: PartTypeCore, + }) + } + // Add sidx streaming parts + for name, sidxParts := range sidxPartsToSync { + sidxStreamingParts, sidxReleaseFuncs := tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID), name) + streamingParts = append(streamingParts, sidxStreamingParts...) + releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...) Review Comment: [nitpick] The variable name `sidxStreamingParts` and `sidxReleaseFuncs` could be more descriptive, such as `streamingPartsForSidx` and `releaseForSidx` to improve code clarity. ```suggestion streamingPartsForSidx, releaseForSidx := tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID), name) streamingParts = append(streamingParts, streamingPartsForSidx...) releaseFuncs = append(releaseFuncs, releaseForSidx...) ``` ########## banyand/trace/write_liaison.go: ########## @@ -25,11 +25,16 @@ import ( "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/api/data" Review Comment: [nitpick] The title 'Impelment chunked sync for trace' contains a spelling error. It should be 'Implement chunked sync for trace'. ########## banyand/internal/sidx/part.go: ########## @@ -512,6 +535,24 @@ type memPart struct { keys bytes.Buffer } +func (mp *memPart) ID() uint64 { + return mp.partMetadata.ID +} + +// SetPartMetadata sets the part metadata for the MemPart. +func (mp *memPart) SetPartMetadata(compressedSizeBytes, uncompressedSizeBytes, totalCount, blocksCount uint64, minKey, maxKey int64, id uint64) { + if mp.partMetadata == nil { + mp.partMetadata = &partMetadata{} + } + mp.partMetadata.CompressedSizeBytes = compressedSizeBytes + mp.partMetadata.UncompressedSizeBytes = uncompressedSizeBytes + mp.partMetadata.TotalCount = totalCount + mp.partMetadata.BlocksCount = blocksCount + mp.partMetadata.MinKey = minKey + mp.partMetadata.MaxKey = maxKey + mp.partMetadata.ID = id Review Comment: [nitpick] The SetPartMetadata method has many parameters (7 parameters). Consider using a struct parameter or builder pattern to improve readability and maintainability. ```suggestion func (mp *memPart) SetPartMetadata(meta partMetadata) { if mp.partMetadata == nil { mp.partMetadata = &partMetadata{} } *mp.partMetadata = meta ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org