Copilot commented on code in PR #801:
URL: 
https://github.com/apache/skywalking-banyandb/pull/801#discussion_r2416500250


##########
banyand/trace/tstable.go:
##########
@@ -341,6 +361,7 @@ func (tst *tsTable) loadSidxMap() {
                        tst.l.Error().Err(err).Str("name", 
sidxName).Msg("failed to create sidx options, skipping")
                        continue
                }
+               sidxOpts.AvaiablePartIDs = avaiablePartIDs

Review Comment:
   Corrected spelling of 'AvaiablePartIDs' to 'AvailablePartIDs'.
   ```suggestion
                sidxOpts.AvailablePartIDs = avaiablePartIDs
   ```



##########
banyand/trace/syncer.go:
##########
@@ -299,129 +236,34 @@ func (tst *tsTable) executeSyncOperation(partsToSync 
[]*part, sidxPartsToSync ma
        }()
 
        nodes := tst.getNodes()
-       return tst.syncStreamingPartsToNodes(ctx, nodes, partsToSync, 
sidxPartsToSync, &releaseFuncs)
-}
-
-// handleSyncIntroductions creates and processes sync introductions for both 
core and sidx parts.
-func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, 
sidxPartsToSync map[string][]*sidx.Part, syncCh chan *syncIntroduction) error {
-       // Create core sync introduction
-       si := generateSyncIntroduction()
-       defer releaseSyncIntroduction(si)
-       si.applied = make(chan struct{})
-       for _, part := range partsToSync {
-               ph := partHandle{
-                       partID:   part.partMetadata.ID,
-                       partType: PartTypeCore,
-               }
-               si.synced[ph] = struct{}{}
-       }
-
-       // Create sidx sync introductions
-       sidxSyncIntroductions := 
tst.createSidxSyncIntroductions(sidxPartsToSync)
-       defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions)
-
-       // Send sync introductions
-       if err := tst.sendSyncIntroductions(si, sidxSyncIntroductions, syncCh); 
err != nil {
-               return err
-       }
-
-       // Wait for sync introductions to be applied
-       return tst.waitForSyncIntroductions(si, sidxSyncIntroductions)
-}
-
-// createSidxSyncIntroductions creates sync introductions for sidx parts.
-func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync 
map[string][]*sidx.Part) map[string]*sidx.SyncIntroduction {
-       sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction)
-       for name, sidxParts := range sidxPartsToSync {
-               if len(sidxParts) > 0 {
-                       ssi := sidx.GenerateSyncIntroduction()
-                       ssi.Applied = make(chan struct{})
-                       for _, part := range sidxParts {
-                               ssi.Synced[part.ID()] = struct{}{}
-                       }
-                       sidxSyncIntroductions[name] = ssi
-               }
-       }
-       return sidxSyncIntroductions
-}
-
-// releaseSidxSyncIntroductions releases sidx sync introductions.
-func (tst *tsTable) releaseSidxSyncIntroductions(sidxSyncIntroductions 
map[string]*sidx.SyncIntroduction) {
-       for _, ssi := range sidxSyncIntroductions {
-               sidx.ReleaseSyncIntroduction(ssi)
-       }
-}
-
-// sendSyncIntroductions sends sync introductions to their respective channels.
-func (tst *tsTable) sendSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction, syncCh chan 
*syncIntroduction) error {
-       select {
-       case syncCh <- si:
-       case <-tst.loopCloser.CloseNotify():
-               return errClosed
-       }
-
-       for name, ssi := range sidxSyncIntroductions {
-               sidx, ok := tst.getSidx(name)
-               if !ok {
-                       return fmt.Errorf("sidx %s not found", name)
-               }
-               select {
-               case sidx.SyncCh() <- ssi:
-               case <-tst.loopCloser.CloseNotify():
-                       return errClosed
-               }
-       }
-       return nil
-}
-
-// waitForSyncIntroductions waits for all sync introductions to be applied.
-func (tst *tsTable) waitForSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction) error {
-       select {
-       case <-si.applied:
-       case <-tst.loopCloser.CloseNotify():
-               return errClosed
-       }
-
-       for _, ssi := range sidxSyncIntroductions {
-               select {
-               case <-ssi.Applied:
-               case <-tst.loopCloser.CloseNotify():
-                       return errClosed
-               }
-       }
-       return nil
-}
-
-// syncStreamingPartsToNodes synchronizes streamingparts to multiple nodes.
-func (tst *tsTable) syncStreamingPartsToNodes(ctx context.Context, nodes 
[]string,
-       partsToSync []*part, sidxPartsToSync map[string][]*sidx.Part, 
releaseFuncs *[]func(),
-) error {
        if tst.loopCloser != nil && tst.loopCloser.Closed() {
                return errClosed
        }
+       sidxMap := tst.getAllSidx()
        for _, node := range nodes {
                if tst.loopCloser != nil && tst.loopCloser.Closed() {
                        return errClosed
                }
                // Prepare all streaming parts data
                streamingParts := make([]queue.StreamingPartData, 0)
                // Add sidx streaming parts
-               for name, sidxParts := range sidxPartsToSync {
-                       if len(sidxParts) == 0 {
-                               continue
+               for name, sidx := range sidxMap {
+                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
+                       if len(sidxStreamingParts) != len(partIDsToSync) {
+                               logger.Panicf("sidx streaming parts count 
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
+                               return nil
                        }
-                       sidx, ok := tst.getSidx(name)
-                       if !ok {
-                               return fmt.Errorf("sidx %s not found", name)
-                       }
-                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(sidxParts, tst.group, uint32(tst.shardID), name)
                        streamingParts = append(streamingParts, 
sidxStreamingParts...)
-                       *releaseFuncs = append(*releaseFuncs, 
sidxReleaseFuncs...)
+                       releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...)
+               }
+               if len(streamingParts) != len(partIDsToSync)*len(sidxMap) {
+                       logger.Panicf("streaming parts count mismatch: %d != 
%d", len(streamingParts), len(partIDsToSync)*len(sidxMap))
+                       return nil

Review Comment:
   The panic followed by return statement is unreachable code. Remove the 
return statement since panic will terminate execution.



##########
banyand/trace/syncer.go:
##########
@@ -299,129 +236,34 @@ func (tst *tsTable) executeSyncOperation(partsToSync 
[]*part, sidxPartsToSync ma
        }()
 
        nodes := tst.getNodes()
-       return tst.syncStreamingPartsToNodes(ctx, nodes, partsToSync, 
sidxPartsToSync, &releaseFuncs)
-}
-
-// handleSyncIntroductions creates and processes sync introductions for both 
core and sidx parts.
-func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, 
sidxPartsToSync map[string][]*sidx.Part, syncCh chan *syncIntroduction) error {
-       // Create core sync introduction
-       si := generateSyncIntroduction()
-       defer releaseSyncIntroduction(si)
-       si.applied = make(chan struct{})
-       for _, part := range partsToSync {
-               ph := partHandle{
-                       partID:   part.partMetadata.ID,
-                       partType: PartTypeCore,
-               }
-               si.synced[ph] = struct{}{}
-       }
-
-       // Create sidx sync introductions
-       sidxSyncIntroductions := 
tst.createSidxSyncIntroductions(sidxPartsToSync)
-       defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions)
-
-       // Send sync introductions
-       if err := tst.sendSyncIntroductions(si, sidxSyncIntroductions, syncCh); 
err != nil {
-               return err
-       }
-
-       // Wait for sync introductions to be applied
-       return tst.waitForSyncIntroductions(si, sidxSyncIntroductions)
-}
-
-// createSidxSyncIntroductions creates sync introductions for sidx parts.
-func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync 
map[string][]*sidx.Part) map[string]*sidx.SyncIntroduction {
-       sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction)
-       for name, sidxParts := range sidxPartsToSync {
-               if len(sidxParts) > 0 {
-                       ssi := sidx.GenerateSyncIntroduction()
-                       ssi.Applied = make(chan struct{})
-                       for _, part := range sidxParts {
-                               ssi.Synced[part.ID()] = struct{}{}
-                       }
-                       sidxSyncIntroductions[name] = ssi
-               }
-       }
-       return sidxSyncIntroductions
-}
-
-// releaseSidxSyncIntroductions releases sidx sync introductions.
-func (tst *tsTable) releaseSidxSyncIntroductions(sidxSyncIntroductions 
map[string]*sidx.SyncIntroduction) {
-       for _, ssi := range sidxSyncIntroductions {
-               sidx.ReleaseSyncIntroduction(ssi)
-       }
-}
-
-// sendSyncIntroductions sends sync introductions to their respective channels.
-func (tst *tsTable) sendSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction, syncCh chan 
*syncIntroduction) error {
-       select {
-       case syncCh <- si:
-       case <-tst.loopCloser.CloseNotify():
-               return errClosed
-       }
-
-       for name, ssi := range sidxSyncIntroductions {
-               sidx, ok := tst.getSidx(name)
-               if !ok {
-                       return fmt.Errorf("sidx %s not found", name)
-               }
-               select {
-               case sidx.SyncCh() <- ssi:
-               case <-tst.loopCloser.CloseNotify():
-                       return errClosed
-               }
-       }
-       return nil
-}
-
-// waitForSyncIntroductions waits for all sync introductions to be applied.
-func (tst *tsTable) waitForSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction) error {
-       select {
-       case <-si.applied:
-       case <-tst.loopCloser.CloseNotify():
-               return errClosed
-       }
-
-       for _, ssi := range sidxSyncIntroductions {
-               select {
-               case <-ssi.Applied:
-               case <-tst.loopCloser.CloseNotify():
-                       return errClosed
-               }
-       }
-       return nil
-}
-
-// syncStreamingPartsToNodes synchronizes streamingparts to multiple nodes.
-func (tst *tsTable) syncStreamingPartsToNodes(ctx context.Context, nodes 
[]string,
-       partsToSync []*part, sidxPartsToSync map[string][]*sidx.Part, 
releaseFuncs *[]func(),
-) error {
        if tst.loopCloser != nil && tst.loopCloser.Closed() {
                return errClosed
        }
+       sidxMap := tst.getAllSidx()
        for _, node := range nodes {
                if tst.loopCloser != nil && tst.loopCloser.Closed() {
                        return errClosed
                }
                // Prepare all streaming parts data
                streamingParts := make([]queue.StreamingPartData, 0)
                // Add sidx streaming parts
-               for name, sidxParts := range sidxPartsToSync {
-                       if len(sidxParts) == 0 {
-                               continue
+               for name, sidx := range sidxMap {
+                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
+                       if len(sidxStreamingParts) != len(partIDsToSync) {
+                               logger.Panicf("sidx streaming parts count 
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
+                               return nil
                        }
-                       sidx, ok := tst.getSidx(name)
-                       if !ok {
-                               return fmt.Errorf("sidx %s not found", name)
-                       }
-                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(sidxParts, tst.group, uint32(tst.shardID), name)
                        streamingParts = append(streamingParts, 
sidxStreamingParts...)
-                       *releaseFuncs = append(*releaseFuncs, 
sidxReleaseFuncs...)
+                       releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...)
+               }
+               if len(streamingParts) != len(partIDsToSync)*len(sidxMap) {
+                       logger.Panicf("streaming parts count mismatch: %d != 
%d", len(streamingParts), len(partIDsToSync)*len(sidxMap))
+                       return nil

Review Comment:
   The panic followed by return statement is unreachable code. Remove the 
return statement since panic will terminate execution.



##########
banyand/internal/sidx/options.go:
##########
@@ -37,6 +37,11 @@ type Options struct {
        // This includes part directories, metadata files, and temporary files.
        // MANDATORY: Must be provided, non-empty, and an absolute path.
        Path string
+
+       // AvaiablePartIDs is the list of available part IDs.
+       // It is used to load the snapshot and validate the part IDs.
+       // MANDATORY: Must be provided and cannot be nil.
+       AvaiablePartIDs []uint64

Review Comment:
   Corrected spelling of 'AvaiablePartIDs' to 'AvailablePartIDs'.
   ```suggestion
        // AvailablePartIDs is the list of available part IDs.
        // It is used to load the snapshot and validate the part IDs.
        // MANDATORY: Must be provided and cannot be nil.
        AvailablePartIDs []uint64
   ```



##########
banyand/trace/write_data.go:
##########
@@ -35,11 +34,41 @@ type syncPartContext struct {
        l                   *logger.Logger
        writers             *writers
        memPart             *memPart
-       sidxPartContext     *sidx.SyncPartContext
+       sidxPartContexts    map[string]*sidx.SyncPartContext
        traceIDFilterBuffer []byte
        tagTypeBuffer       []byte
 }
 
+func (s *syncPartContext) NewPartType(ctx *queue.ChunkedSyncPartContext) error 
{
+       if ctx.PartType != PartTypeCore {

Review Comment:
   Magic string comparison `PartTypeCore` should be defined as a constant for 
better maintainability and to avoid typos.



##########
banyand/internal/sidx/sidx.go:
##########
@@ -463,26 +388,15 @@ func (s *sidx) Flush() error {
                flushIntro.flushed[newPW.ID()] = newPW
        }
 
-       if len(flushIntro.flushed) == 0 {
-               return nil
+       if len(flushIntro.flushed) != len(partIDsToFlush) {
+               logger.Panicf("expected %d parts to flush, but got %d", 
len(partIDsToFlush), len(flushIntro.flushed))
+               return nil, nil

Review Comment:
   The panic followed by return statement is unreachable code. Remove the 
return statement since panic will terminate execution.
   ```suggestion
   
   ```



##########
banyand/internal/sidx/sync.go:
##########
@@ -62,28 +57,44 @@ func (s *sidx) PartsToSync() []*part {
 }
 
 // StreamingParts returns the streaming parts.
-func (s *sidx) StreamingParts(partsToSync []*part, group string, shardID 
uint32, name string) ([]queue.StreamingPartData, []func()) {
+func (s *sidx) StreamingParts(partIDsToSync map[uint64]struct{}, group string, 
shardID uint32, name string) ([]queue.StreamingPartData, []func()) {
+       snapshot := s.currentSnapshot()
+       if snapshot == nil {
+               return nil, nil
+       }
+       defer snapshot.decRef()
        var streamingParts []queue.StreamingPartData
        var releaseFuncs []func()
-       for _, part := range partsToSync {
-               // Create streaming reader for the part
-               files, release := createPartFileReaders(part)
-               releaseFuncs = append(releaseFuncs, release)
-               // Create streaming part sync data
-               streamingParts = append(streamingParts, queue.StreamingPartData{
-                       ID:                    part.partMetadata.ID,
-                       Group:                 group,
-                       ShardID:               shardID,
-                       Topic:                 data.TopicTracePartSync.String(),
-                       Files:                 files,
-                       CompressedSizeBytes:   
part.partMetadata.CompressedSizeBytes,
-                       UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
-                       TotalCount:            part.partMetadata.TotalCount,
-                       BlocksCount:           part.partMetadata.BlocksCount,
-                       MinTimestamp:          part.partMetadata.SegmentID,
-                       MinKey:                part.partMetadata.MinKey,
-                       MaxKey:                part.partMetadata.MaxKey,
-                       PartType:              name,
+       for _, pw := range snapshot.parts {
+               if _, ok := partIDsToSync[pw.p.partMetadata.ID]; ok {
+                       if pw.mp != nil {
+                               logger.Panicf("sidx streaming parts: %s, part 
%d should not a mem part", name, pw.p.partMetadata.ID)
+                               return nil, nil

Review Comment:
   The panic followed by return statement is unreachable code. Remove the 
return statement since panic will terminate execution.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to