This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new bb93d4813 sub pr2: queue_sub chunked sync observability (#1107)
bb93d4813 is described below

commit bb93d48136195533cb28b92987e74f7dc179737a
Author: OmCheeLin <[email protected]>
AuthorDate: Sun May 3 18:46:42 2026 +0800

    sub pr2: queue_sub chunked sync observability (#1107)
---
 banyand/queue/sub/chunked_sync.go        | 169 ++++++++++++++++-------
 banyand/queue/sub/chunked_sync_test.go   |  13 +-
 banyand/queue/sub/server.go              |  20 +++
 banyand/queue/sub/server_metrics_test.go | 221 +++++++++++++++++++++++++++++++
 4 files changed, 368 insertions(+), 55 deletions(-)

diff --git a/banyand/queue/sub/chunked_sync.go 
b/banyand/queue/sub/chunked_sync.go
index 7651a52e6..4deb88e59 100644
--- a/banyand/queue/sub/chunked_sync.go
+++ b/banyand/queue/sub/chunked_sync.go
@@ -103,16 +103,17 @@ type chunkBuffer struct {
 }
 
 type syncSession struct {
-       startTime      time.Time
-       metadata       *clusterv1.SyncMetadata
-       partsProgress  map[int]*partProgress
-       partCtx        *queue.ChunkedSyncPartContext
-       chunkBuffer    *chunkBuffer
-       sessionID      string
-       errorMsg       string
-       totalReceived  uint64
-       chunksReceived uint32
-       completed      bool
+       startTime       time.Time
+       metadata        *clusterv1.SyncMetadata
+       partsProgress   map[int]*partProgress
+       partCtx         *queue.ChunkedSyncPartContext
+       chunkBuffer     *chunkBuffer
+       sessionID       string
+       errorMsg        string
+       totalReceived   uint64
+       chunksReceived  uint32
+       abortedRecorded bool
+       completed       bool
 }
 
 type partProgress struct {
@@ -121,17 +122,57 @@ type partProgress struct {
        completed     bool
 }
 
+const (
+       abortedReasonSwitch      = "switch"
+       abortedReasonStreamError = "stream_error"
+       abortedReasonCtxDone     = "ctx_done"
+       abortedReasonEOF         = "eof"
+)
+
+// releaseMetrics releases the gauges held by this session.
+// Idempotent: safe to call from both handleCompletion and the SyncPart defer.
+func (s *syncSession) releaseMetrics(m *metrics) {
+       if m == nil || s.completed {
+               return
+       }
+       m.activeSyncSessions.Add(-1, s.metadata.Topic)
+       if s.chunkBuffer != nil {
+               if n := len(s.chunkBuffer.chunks); n > 0 {
+                       m.reorderBuffered.Add(-float64(n), s.metadata.Topic)
+               }
+       }
+       s.completed = true
+}
+
+func (s *syncSession) recordAborted(m *metrics, reason string) {
+       if m == nil || s.abortedRecorded || s.completed {
+               return
+       }
+       if reason == "" {
+               return
+       }
+       // chunkedSyncAbortedTotal is always initialized in production; tests 
that wire partial metrics must set it if they trigger abort paths.
+       m.chunkedSyncAbortedTotal.Inc(1, s.metadata.Topic, reason)
+       s.abortedRecorded = true
+}
+
 // SyncPart implements clusterv1.ChunkedSyncServiceServer.
 func (s *server) SyncPart(stream clusterv1.ChunkedSyncService_SyncPartServer) 
error {
        ctx := stream.Context()
        var currentSession *syncSession
        var sessionID string
+       var abortReason string
        defer func() {
-               if currentSession != nil {
-                       if currentSession.partCtx != nil {
-                               if closeErr := currentSession.partCtx.Close(); 
closeErr != nil {
-                                       
s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close session partCtx")
-                               }
+               if currentSession == nil {
+                       return
+               }
+               if !currentSession.completed {
+                       currentSession.recordAborted(s.metrics, abortReason)
+               }
+               currentSession.releaseMetrics(s.metrics)
+               if currentSession.partCtx != nil {
+                       if closeErr := currentSession.partCtx.Close(); closeErr 
!= nil {
+                               s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close session partCtx")
                        }
                }
        }()
@@ -139,15 +180,18 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
        for {
                select {
                case <-ctx.Done():
+                       abortReason = abortedReasonCtxDone
                        return ctx.Err()
                default:
                }
 
                req, err := stream.Recv()
                if errors.Is(err, io.EOF) {
+                       abortReason = abortedReasonEOF
                        break
                }
                if err != nil {
+                       abortReason = abortedReasonStreamError
                        s.log.Error().Err(err).Msg("failed to receive chunk")
                        return err
                }
@@ -155,34 +199,7 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
                sessionID = req.SessionId
 
                if req.GetMetadata() != nil {
-                       if currentSession != nil {
-                               if currentSession.partCtx != nil {
-                                       if currentSession.partCtx.Handler != 
nil {
-                                               if finishErr := 
currentSession.partCtx.Handler.FinishSync(); finishErr != nil {
-                                                       
s.updateChunkOrderMetrics("finish_sync_err", currentSession.metadata.Topic)
-                                                       
s.log.Error().Err(finishErr).Str("session_id", 
currentSession.sessionID).Msg("failed to finish sync for previous session")
-                                               }
-                                               if closeErr := 
currentSession.partCtx.Close(); closeErr != nil {
-                                                       
s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close previous session partCtx")
-                                               }
-                                       } else if closeErr := 
currentSession.partCtx.Close(); closeErr != nil {
-                                               
s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close previous session partCtx")
-                                       }
-                               }
-                       }
-                       currentSession = &syncSession{
-                               sessionID:      sessionID,
-                               metadata:       req.GetMetadata(),
-                               startTime:      time.Now(),
-                               chunksReceived: 0,
-                               partsProgress:  make(map[int]*partProgress),
-                       }
-                       if dl := s.log.Debug(); dl.Enabled() {
-                               dl.Str("session_id", sessionID).
-                                       Str("topic", req.GetMetadata().Topic).
-                                       Uint32("total_parts", 
req.GetMetadata().TotalParts).
-                                       Msg("started chunked sync session")
-                       }
+                       currentSession = s.startOrSwitchSession(sessionID, req, 
currentSession)
                }
 
                if currentSession == nil {
@@ -204,6 +221,50 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
        return nil
 }
 
+func (s *server) startOrSwitchSession(sessionID string, req 
*clusterv1.SyncPartRequest, previousSession *syncSession) *syncSession {
+       if previousSession != nil {
+               s.cleanupPreviousSession(previousSession)
+       }
+
+       newSession := &syncSession{
+               sessionID:      sessionID,
+               metadata:       req.GetMetadata(),
+               startTime:      time.Now(),
+               chunksReceived: 0,
+               partsProgress:  make(map[int]*partProgress),
+       }
+       if s.metrics != nil {
+               s.metrics.activeSyncSessions.Add(1, newSession.metadata.Topic)
+       }
+       if dl := s.log.Debug(); dl.Enabled() {
+               dl.Str("session_id", sessionID).
+                       Str("topic", req.GetMetadata().Topic).
+                       Uint32("total_parts", req.GetMetadata().TotalParts).
+                       Msg("started chunked sync session")
+       }
+       return newSession
+}
+
+func (s *server) cleanupPreviousSession(previousSession *syncSession) {
+       if !previousSession.completed {
+               previousSession.recordAborted(s.metrics, abortedReasonSwitch)
+       }
+       previousSession.releaseMetrics(s.metrics)
+
+       if previousSession.partCtx == nil {
+               return
+       }
+       if previousSession.partCtx.Handler != nil {
+               if finishErr := previousSession.partCtx.Handler.FinishSync(); 
finishErr != nil {
+                       s.updateChunkOrderMetrics("finish_sync_err", 
previousSession.metadata.Topic)
+                       s.log.Error().Err(finishErr).Str("session_id", 
previousSession.sessionID).Msg("failed to finish sync for previous session")
+               }
+       }
+       if closeErr := previousSession.partCtx.Close(); closeErr != nil {
+               s.log.Error().Err(closeErr).Str("session_id", 
previousSession.sessionID).Msg("failed to close previous session partCtx")
+       }
+}
+
 func (s *server) processChunk(stream 
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req 
*clusterv1.SyncPartRequest) error {
        // Check version compatibility on every chunk
        if req.VersionInfo != nil {
@@ -306,7 +367,9 @@ func (s *server) processChunkWithReordering(stream 
clusterv1.ChunkedSyncService_
                                Msg("buffered out-of-order chunk")
                }
                s.updateChunkOrderMetrics("chunk_buffered", 
session.metadata.Topic)
-
+               if s.metrics != nil {
+                       s.metrics.reorderBuffered.Add(1, session.metadata.Topic)
+               }
                return s.sendResponse(stream, req, 
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED,
                        fmt.Sprintf("chunk %d buffered (waiting for %d)", 
req.ChunkIndex, buffer.expectedIndex), nil)
        }
@@ -414,6 +477,9 @@ func (s *server) processBufferedChunks(stream 
clusterv1.ChunkedSyncService_SyncP
        for {
                if chunk, exists := buffer.chunks[buffer.expectedIndex]; exists 
{
                        delete(buffer.chunks, buffer.expectedIndex)
+                       if s.metrics != nil {
+                               s.metrics.reorderBuffered.Add(-1, 
session.metadata.Topic)
+                       }
 
                        if dl := s.log.Debug(); dl.Enabled() {
                                dl.Str("session_id", session.sessionID).
@@ -522,8 +588,6 @@ func (s *server) handleCompletion(stream 
clusterv1.ChunkedSyncService_SyncPartSe
                session.partCtx.Handler = nil
        }
 
-       session.completed = true
-
        partsResults := make([]*clusterv1.PartResult, 0, 
len(session.partsProgress))
        allPartsSuccessful := true
 
@@ -549,6 +613,19 @@ func (s *server) handleCompletion(stream 
clusterv1.ChunkedSyncService_SyncPartSe
                PartsResults:       partsResults,
        }
 
+       session.releaseMetrics(s.metrics)
+       if s.metrics != nil {
+               topic := session.metadata.Topic
+               
s.metrics.chunkedSyncTotalBytes.Inc(float64(syncResult.TotalBytesReceived), 
topic)
+               
s.metrics.chunkedSyncDurationSecs.Observe(float64(syncResult.DurationMs)/1000.0,
 topic)
+
+               for _, pr := range partsResults {
+                       if !pr.Success {
+                               s.metrics.chunkedSyncFailedParts.Inc(1, topic)
+                       }
+               }
+       }
+
        if dl := s.log.Debug(); dl.Enabled() {
                dl.Str("session_id", session.sessionID).
                        Str("topic", session.metadata.Topic).
diff --git a/banyand/queue/sub/chunked_sync_test.go 
b/banyand/queue/sub/chunked_sync_test.go
index 8a7430f31..2ba9889ff 100644
--- a/banyand/queue/sub/chunked_sync_test.go
+++ b/banyand/queue/sub/chunked_sync_test.go
@@ -238,17 +238,12 @@ func 
TestChunkOrderingMetricsAreLabeledByTopic_NotSessionID(t *testing.T) {
        mockHandler := &MockChunkedSyncHandler{}
        s.chunkedSyncHandlers[data.TopicStreamPartSync] = mockHandler
 
-       // metrics: this test will trigger at least two events:
-       // - out_of_order_received
-       // - chunk_buffered
-       // so must put both counters, otherwise nil.Inc will panic.
        outOfOrder := &capturingCounter{}
        buffered := &capturingCounter{}
-       s.metrics = &metrics{
-               outOfOrderChunksReceived: outOfOrder,
-               chunksBuffered:           buffered,
-               // other counters will not be triggered in this test, leave 
them nil
-       }
+       testMetrics := newTestMetrics()
+       testMetrics.outOfOrderChunksReceived = outOfOrder
+       testMetrics.chunksBuffered = buffered
+       s.metrics = testMetrics
 
        topic := data.TopicStreamPartSync.String()
 
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 9d673d97a..454e5e508 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -440,6 +440,16 @@ type metrics struct {
        largeGapsRejected        meter.Counter
        bufferCapacityExceeded   meter.Counter
        finishSyncErr            meter.Counter
+
+       // Chunked sync saturation metrics
+       activeSyncSessions meter.Gauge
+       reorderBuffered    meter.Gauge
+
+       // Chunked sync outcome metrics
+       chunkedSyncAbortedTotal meter.Counter
+       chunkedSyncFailedParts  meter.Counter
+       chunkedSyncTotalBytes   meter.Counter
+       chunkedSyncDurationSecs meter.Histogram
 }
 
 func newMetrics(factory observability.Factory) *metrics {
@@ -460,6 +470,16 @@ func newMetrics(factory observability.Factory) *metrics {
                largeGapsRejected:        
factory.NewCounter("large_gaps_rejected", "topic"),
                bufferCapacityExceeded:   
factory.NewCounter("buffer_capacity_exceeded", "topic"),
                finishSyncErr:            factory.NewCounter("finish_sync_err", 
"topic"),
+
+               // Chunked sync saturation metrics
+               activeSyncSessions: 
factory.NewGauge("chunked_sync_active_sessions", "topic"),
+               reorderBuffered:    
factory.NewGauge("chunk_reorder_buffered_chunks", "topic"),
+
+               // Chunked sync outcome metrics
+               chunkedSyncAbortedTotal: 
factory.NewCounter("chunked_sync_aborted_total", "topic", "reason"),
+               chunkedSyncFailedParts:  
factory.NewCounter("chunked_sync_failed_parts_total", "topic"),
+               chunkedSyncTotalBytes:   
factory.NewCounter("chunked_sync_total_bytes_received", "topic"),
+               chunkedSyncDurationSecs: 
factory.NewHistogram("chunked_sync_duration_seconds", meter.DefBuckets, 
"topic"),
        }
 }
 
diff --git a/banyand/queue/sub/server_metrics_test.go 
b/banyand/queue/sub/server_metrics_test.go
new file mode 100644
index 000000000..121d2f5be
--- /dev/null
+++ b/banyand/queue/sub/server_metrics_test.go
@@ -0,0 +1,221 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type noopCounter struct{}
+
+func (*noopCounter) Inc(_ float64, _ ...string) {}
+func (*noopCounter) Delete(_ ...string) bool    { return true }
+
+type noopGauge struct{}
+
+func (*noopGauge) Set(_ float64, _ ...string) {}
+func (*noopGauge) Add(_ float64, _ ...string) {}
+func (*noopGauge) Delete(_ ...string) bool    { return true }
+
+type noopHistogram struct{}
+
+func (*noopHistogram) Observe(_ float64, _ ...string) {}
+func (*noopHistogram) Delete(_ ...string) bool        { return true }
+
+func newTestMetrics() *metrics {
+       c := &noopCounter{}
+       g := &noopGauge{}
+       h := &noopHistogram{}
+       return &metrics{
+               totalStarted:  c,
+               totalFinished: c,
+               totalErr:      c,
+               totalLatency:  c,
+
+               totalMsgReceived:    c,
+               totalMsgReceivedErr: c,
+               totalMsgSent:        c,
+               totalMsgSentErr:     c,
+
+               outOfOrderChunksReceived: c,
+               chunksBuffered:           c,
+               bufferTimeouts:           c,
+               largeGapsRejected:        c,
+               bufferCapacityExceeded:   c,
+               finishSyncErr:            c,
+
+               activeSyncSessions: g,
+               reorderBuffered:    g,
+
+               chunkedSyncAbortedTotal: c,
+               chunkedSyncFailedParts:  c,
+               chunkedSyncTotalBytes:   c,
+               chunkedSyncDurationSecs: h,
+       }
+}
+
+type fakeCounter struct {
+       total float64
+}
+
+func (f *fakeCounter) Inc(delta float64, _ ...string) { f.total += delta }
+func (*fakeCounter) Delete(_ ...string) bool          { return true }
+
+type fakeGauge struct {
+       value float64
+}
+
+func (g *fakeGauge) Set(v float64, _ ...string)     { g.value = v }
+func (g *fakeGauge) Add(delta float64, _ ...string) { g.value += delta }
+func (*fakeGauge) Delete(_ ...string) bool          { return true }
+func newFakeGauge() meter.Gauge                     { return &fakeGauge{} }
+func getGaugeValue(g meter.Gauge) float64           { return 
g.(*fakeGauge).value }
+
+type fakeHistogram struct {
+       count int
+}
+
+func (h *fakeHistogram) Observe(_ float64, _ ...string) { h.count++ }
+func (*fakeHistogram) Delete(_ ...string) bool          { return true }
+
+func TestReleaseMetricsReleasesGauges(t *testing.T) {
+       m := &metrics{
+               activeSyncSessions: newFakeGauge(),
+               reorderBuffered:    newFakeGauge(),
+       }
+
+       sess := &syncSession{
+               sessionID: "s1",
+               startTime: time.Now(),
+               metadata:  &clusterv1.SyncMetadata{Topic: 
"v1:stream-part-sync"},
+               chunkBuffer: &chunkBuffer{
+                       chunks: map[uint32]*clusterv1.SyncPartRequest{
+                               2: {ChunkIndex: 2},
+                               3: {ChunkIndex: 3},
+                       },
+               },
+       }
+
+       // simulate start increments
+       m.activeSyncSessions.Add(1, sess.metadata.Topic)
+       m.reorderBuffered.Add(2, sess.metadata.Topic)
+
+       sess.releaseMetrics(m)
+
+       require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
+       require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
+       // idempotent
+       sess.releaseMetrics(m)
+       require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
+       require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
+}
+
+func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
+       logInitErr := logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "info",
+       })
+       require.NoError(t, logInitErr)
+
+       topic := "v1:stream-part-sync"
+       totalBytes := float64(10)
+
+       s := &server{
+               log: logger.GetLogger("test-server-completion-metrics"),
+               metrics: &metrics{
+                       activeSyncSessions:      newFakeGauge(),
+                       reorderBuffered:         newFakeGauge(),
+                       chunkedSyncAbortedTotal: &fakeCounter{},
+                       chunkedSyncFailedParts:  &fakeCounter{},
+                       chunkedSyncTotalBytes:   &fakeCounter{},
+                       chunkedSyncDurationSecs: &fakeHistogram{},
+               },
+       }
+
+       session := &syncSession{
+               sessionID: "s1",
+               startTime: time.Now().Add(-2 * time.Second),
+               metadata:  &clusterv1.SyncMetadata{Topic: topic},
+               partCtx:   &queue.ChunkedSyncPartContext{},
+               partsProgress: map[int]*partProgress{
+                       0: {totalBytes: 10, receivedBytes: 10, completed: true},
+                       1: {totalBytes: 10, receivedBytes: 0, completed: false},
+               },
+               totalReceived:  10,
+               chunksReceived: 2,
+       }
+
+       // Simulate start increments that would have happened on session 
creation.
+       s.metrics.activeSyncSessions.Add(1, topic)
+
+       mockStream := &MockSyncPartStream{}
+       req := &clusterv1.SyncPartRequest{
+               SessionId:  session.sessionID,
+               ChunkIndex: 0,
+               Content: &clusterv1.SyncPartRequest_Completion{
+                       Completion: &clusterv1.SyncCompletion{},
+               },
+       }
+       handleErr := s.handleCompletion(mockStream, session, req)
+       require.NoError(t, handleErr)
+
+       require.Equal(t, totalBytes, 
s.metrics.chunkedSyncTotalBytes.(*fakeCounter).total)
+       require.Equal(t, 1, 
s.metrics.chunkedSyncDurationSecs.(*fakeHistogram).count)
+       require.Equal(t, float64(1), 
s.metrics.chunkedSyncFailedParts.(*fakeCounter).total)
+       require.Equal(t, float64(0), 
getGaugeValue(s.metrics.activeSyncSessions))
+       require.True(t, session.completed)
+}
+
+func TestCleanupPreviousSessionRecordsAborted(t *testing.T) {
+       logInitErr := logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "info",
+       })
+       require.NoError(t, logInitErr)
+
+       topic := "v1:stream-part-sync"
+       s := &server{
+               log: logger.GetLogger("test-server-switch-abort"),
+               metrics: &metrics{
+                       activeSyncSessions:      newFakeGauge(),
+                       reorderBuffered:         newFakeGauge(),
+                       chunkedSyncAbortedTotal: &fakeCounter{},
+               },
+       }
+
+       prev := &syncSession{
+               sessionID: "s1",
+               startTime: time.Now().Add(-time.Second),
+               metadata:  &clusterv1.SyncMetadata{Topic: topic},
+       }
+       s.metrics.activeSyncSessions.Add(1, topic)
+
+       s.cleanupPreviousSession(prev)
+
+       require.True(t, prev.abortedRecorded)
+       require.Equal(t, float64(1), 
s.metrics.chunkedSyncAbortedTotal.(*fakeCounter).total)
+       require.Equal(t, float64(0), 
getGaugeValue(s.metrics.activeSyncSessions))
+}

Reply via email to