This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new bb93d4813 sub pr2: queue_sub chunked sync observability (#1107)
bb93d4813 is described below
commit bb93d48136195533cb28b92987e74f7dc179737a
Author: OmCheeLin <[email protected]>
AuthorDate: Sun May 3 18:46:42 2026 +0800
sub pr2: queue_sub chunked sync observability (#1107)
---
banyand/queue/sub/chunked_sync.go | 169 ++++++++++++++++-------
banyand/queue/sub/chunked_sync_test.go | 13 +-
banyand/queue/sub/server.go | 20 +++
banyand/queue/sub/server_metrics_test.go | 221 +++++++++++++++++++++++++++++++
4 files changed, 368 insertions(+), 55 deletions(-)
diff --git a/banyand/queue/sub/chunked_sync.go
b/banyand/queue/sub/chunked_sync.go
index 7651a52e6..4deb88e59 100644
--- a/banyand/queue/sub/chunked_sync.go
+++ b/banyand/queue/sub/chunked_sync.go
@@ -103,16 +103,17 @@ type chunkBuffer struct {
}
type syncSession struct {
- startTime time.Time
- metadata *clusterv1.SyncMetadata
- partsProgress map[int]*partProgress
- partCtx *queue.ChunkedSyncPartContext
- chunkBuffer *chunkBuffer
- sessionID string
- errorMsg string
- totalReceived uint64
- chunksReceived uint32
- completed bool
+ startTime time.Time
+ metadata *clusterv1.SyncMetadata
+ partsProgress map[int]*partProgress
+ partCtx *queue.ChunkedSyncPartContext
+ chunkBuffer *chunkBuffer
+ sessionID string
+ errorMsg string
+ totalReceived uint64
+ chunksReceived uint32
+ abortedRecorded bool
+ completed bool
}
type partProgress struct {
@@ -121,17 +122,57 @@ type partProgress struct {
completed bool
}
+const (
+ abortedReasonSwitch = "switch"
+ abortedReasonStreamError = "stream_error"
+ abortedReasonCtxDone = "ctx_done"
+ abortedReasonEOF = "eof"
+)
+
+// releaseMetrics releases the gauges held by this session.
+// Idempotent: safe to call from both handleCompletion and the SyncPart defer.
+func (s *syncSession) releaseMetrics(m *metrics) {
+ if m == nil || s.completed {
+ return
+ }
+ m.activeSyncSessions.Add(-1, s.metadata.Topic)
+ if s.chunkBuffer != nil {
+ if n := len(s.chunkBuffer.chunks); n > 0 {
+ m.reorderBuffered.Add(-float64(n), s.metadata.Topic)
+ }
+ }
+ s.completed = true
+}
+
+func (s *syncSession) recordAborted(m *metrics, reason string) {
+ if m == nil || s.abortedRecorded || s.completed {
+ return
+ }
+ if reason == "" {
+ return
+ }
+ // chunkedSyncAbortedTotal is always initialized in production; tests
that wire partial metrics must set it if they trigger abort paths.
+ m.chunkedSyncAbortedTotal.Inc(1, s.metadata.Topic, reason)
+ s.abortedRecorded = true
+}
+
// SyncPart implements clusterv1.ChunkedSyncServiceServer.
func (s *server) SyncPart(stream clusterv1.ChunkedSyncService_SyncPartServer)
error {
ctx := stream.Context()
var currentSession *syncSession
var sessionID string
+ var abortReason string
defer func() {
- if currentSession != nil {
- if currentSession.partCtx != nil {
- if closeErr := currentSession.partCtx.Close();
closeErr != nil {
-
s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close session partCtx")
- }
+ if currentSession == nil {
+ return
+ }
+ if !currentSession.completed {
+ currentSession.recordAborted(s.metrics, abortReason)
+ }
+ currentSession.releaseMetrics(s.metrics)
+ if currentSession.partCtx != nil {
+ if closeErr := currentSession.partCtx.Close(); closeErr
!= nil {
+ s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close session partCtx")
}
}
}()
@@ -139,15 +180,18 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
for {
select {
case <-ctx.Done():
+ abortReason = abortedReasonCtxDone
return ctx.Err()
default:
}
req, err := stream.Recv()
if errors.Is(err, io.EOF) {
+ abortReason = abortedReasonEOF
break
}
if err != nil {
+ abortReason = abortedReasonStreamError
s.log.Error().Err(err).Msg("failed to receive chunk")
return err
}
@@ -155,34 +199,7 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
sessionID = req.SessionId
if req.GetMetadata() != nil {
- if currentSession != nil {
- if currentSession.partCtx != nil {
- if currentSession.partCtx.Handler !=
nil {
- if finishErr :=
currentSession.partCtx.Handler.FinishSync(); finishErr != nil {
-
s.updateChunkOrderMetrics("finish_sync_err", currentSession.metadata.Topic)
-
s.log.Error().Err(finishErr).Str("session_id",
currentSession.sessionID).Msg("failed to finish sync for previous session")
- }
- if closeErr :=
currentSession.partCtx.Close(); closeErr != nil {
-
s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close previous session partCtx")
- }
- } else if closeErr :=
currentSession.partCtx.Close(); closeErr != nil {
-
s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close previous session partCtx")
- }
- }
- }
- currentSession = &syncSession{
- sessionID: sessionID,
- metadata: req.GetMetadata(),
- startTime: time.Now(),
- chunksReceived: 0,
- partsProgress: make(map[int]*partProgress),
- }
- if dl := s.log.Debug(); dl.Enabled() {
- dl.Str("session_id", sessionID).
- Str("topic", req.GetMetadata().Topic).
- Uint32("total_parts",
req.GetMetadata().TotalParts).
- Msg("started chunked sync session")
- }
+ currentSession = s.startOrSwitchSession(sessionID, req,
currentSession)
}
if currentSession == nil {
@@ -204,6 +221,50 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
return nil
}
+func (s *server) startOrSwitchSession(sessionID string, req
*clusterv1.SyncPartRequest, previousSession *syncSession) *syncSession {
+ if previousSession != nil {
+ s.cleanupPreviousSession(previousSession)
+ }
+
+ newSession := &syncSession{
+ sessionID: sessionID,
+ metadata: req.GetMetadata(),
+ startTime: time.Now(),
+ chunksReceived: 0,
+ partsProgress: make(map[int]*partProgress),
+ }
+ if s.metrics != nil {
+ s.metrics.activeSyncSessions.Add(1, newSession.metadata.Topic)
+ }
+ if dl := s.log.Debug(); dl.Enabled() {
+ dl.Str("session_id", sessionID).
+ Str("topic", req.GetMetadata().Topic).
+ Uint32("total_parts", req.GetMetadata().TotalParts).
+ Msg("started chunked sync session")
+ }
+ return newSession
+}
+
+func (s *server) cleanupPreviousSession(previousSession *syncSession) {
+ if !previousSession.completed {
+ previousSession.recordAborted(s.metrics, abortedReasonSwitch)
+ }
+ previousSession.releaseMetrics(s.metrics)
+
+ if previousSession.partCtx == nil {
+ return
+ }
+ if previousSession.partCtx.Handler != nil {
+ if finishErr := previousSession.partCtx.Handler.FinishSync();
finishErr != nil {
+ s.updateChunkOrderMetrics("finish_sync_err",
previousSession.metadata.Topic)
+ s.log.Error().Err(finishErr).Str("session_id",
previousSession.sessionID).Msg("failed to finish sync for previous session")
+ }
+ }
+ if closeErr := previousSession.partCtx.Close(); closeErr != nil {
+ s.log.Error().Err(closeErr).Str("session_id",
previousSession.sessionID).Msg("failed to close previous session partCtx")
+ }
+}
+
func (s *server) processChunk(stream
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req
*clusterv1.SyncPartRequest) error {
// Check version compatibility on every chunk
if req.VersionInfo != nil {
@@ -306,7 +367,9 @@ func (s *server) processChunkWithReordering(stream
clusterv1.ChunkedSyncService_
Msg("buffered out-of-order chunk")
}
s.updateChunkOrderMetrics("chunk_buffered",
session.metadata.Topic)
-
+ if s.metrics != nil {
+ s.metrics.reorderBuffered.Add(1, session.metadata.Topic)
+ }
return s.sendResponse(stream, req,
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED,
fmt.Sprintf("chunk %d buffered (waiting for %d)",
req.ChunkIndex, buffer.expectedIndex), nil)
}
@@ -414,6 +477,9 @@ func (s *server) processBufferedChunks(stream
clusterv1.ChunkedSyncService_SyncP
for {
if chunk, exists := buffer.chunks[buffer.expectedIndex]; exists
{
delete(buffer.chunks, buffer.expectedIndex)
+ if s.metrics != nil {
+ s.metrics.reorderBuffered.Add(-1,
session.metadata.Topic)
+ }
if dl := s.log.Debug(); dl.Enabled() {
dl.Str("session_id", session.sessionID).
@@ -522,8 +588,6 @@ func (s *server) handleCompletion(stream
clusterv1.ChunkedSyncService_SyncPartSe
session.partCtx.Handler = nil
}
- session.completed = true
-
partsResults := make([]*clusterv1.PartResult, 0,
len(session.partsProgress))
allPartsSuccessful := true
@@ -549,6 +613,19 @@ func (s *server) handleCompletion(stream
clusterv1.ChunkedSyncService_SyncPartSe
PartsResults: partsResults,
}
+ session.releaseMetrics(s.metrics)
+ if s.metrics != nil {
+ topic := session.metadata.Topic
+
s.metrics.chunkedSyncTotalBytes.Inc(float64(syncResult.TotalBytesReceived),
topic)
+
s.metrics.chunkedSyncDurationSecs.Observe(float64(syncResult.DurationMs)/1000.0,
topic)
+
+ for _, pr := range partsResults {
+ if !pr.Success {
+ s.metrics.chunkedSyncFailedParts.Inc(1, topic)
+ }
+ }
+ }
+
if dl := s.log.Debug(); dl.Enabled() {
dl.Str("session_id", session.sessionID).
Str("topic", session.metadata.Topic).
diff --git a/banyand/queue/sub/chunked_sync_test.go
b/banyand/queue/sub/chunked_sync_test.go
index 8a7430f31..2ba9889ff 100644
--- a/banyand/queue/sub/chunked_sync_test.go
+++ b/banyand/queue/sub/chunked_sync_test.go
@@ -238,17 +238,12 @@ func
TestChunkOrderingMetricsAreLabeledByTopic_NotSessionID(t *testing.T) {
mockHandler := &MockChunkedSyncHandler{}
s.chunkedSyncHandlers[data.TopicStreamPartSync] = mockHandler
- // metrics: this test will trigger at least two events:
- // - out_of_order_received
- // - chunk_buffered
- // so must put both counters, otherwise nil.Inc will panic.
outOfOrder := &capturingCounter{}
buffered := &capturingCounter{}
- s.metrics = &metrics{
- outOfOrderChunksReceived: outOfOrder,
- chunksBuffered: buffered,
- // other counters will not be triggered in this test, leave
them nil
- }
+ testMetrics := newTestMetrics()
+ testMetrics.outOfOrderChunksReceived = outOfOrder
+ testMetrics.chunksBuffered = buffered
+ s.metrics = testMetrics
topic := data.TopicStreamPartSync.String()
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 9d673d97a..454e5e508 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -440,6 +440,16 @@ type metrics struct {
largeGapsRejected meter.Counter
bufferCapacityExceeded meter.Counter
finishSyncErr meter.Counter
+
+ // Chunked sync saturation metrics
+ activeSyncSessions meter.Gauge
+ reorderBuffered meter.Gauge
+
+ // Chunked sync outcome metrics
+ chunkedSyncAbortedTotal meter.Counter
+ chunkedSyncFailedParts meter.Counter
+ chunkedSyncTotalBytes meter.Counter
+ chunkedSyncDurationSecs meter.Histogram
}
func newMetrics(factory observability.Factory) *metrics {
@@ -460,6 +470,16 @@ func newMetrics(factory observability.Factory) *metrics {
largeGapsRejected:
factory.NewCounter("large_gaps_rejected", "topic"),
bufferCapacityExceeded:
factory.NewCounter("buffer_capacity_exceeded", "topic"),
finishSyncErr: factory.NewCounter("finish_sync_err",
"topic"),
+
+ // Chunked sync saturation metrics
+ activeSyncSessions:
factory.NewGauge("chunked_sync_active_sessions", "topic"),
+ reorderBuffered:
factory.NewGauge("chunk_reorder_buffered_chunks", "topic"),
+
+ // Chunked sync outcome metrics
+ chunkedSyncAbortedTotal:
factory.NewCounter("chunked_sync_aborted_total", "topic", "reason"),
+ chunkedSyncFailedParts:
factory.NewCounter("chunked_sync_failed_parts_total", "topic"),
+ chunkedSyncTotalBytes:
factory.NewCounter("chunked_sync_total_bytes_received", "topic"),
+ chunkedSyncDurationSecs:
factory.NewHistogram("chunked_sync_duration_seconds", meter.DefBuckets,
"topic"),
}
}
diff --git a/banyand/queue/sub/server_metrics_test.go
b/banyand/queue/sub/server_metrics_test.go
new file mode 100644
index 000000000..121d2f5be
--- /dev/null
+++ b/banyand/queue/sub/server_metrics_test.go
@@ -0,0 +1,221 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type noopCounter struct{}
+
+func (*noopCounter) Inc(_ float64, _ ...string) {}
+func (*noopCounter) Delete(_ ...string) bool { return true }
+
+type noopGauge struct{}
+
+func (*noopGauge) Set(_ float64, _ ...string) {}
+func (*noopGauge) Add(_ float64, _ ...string) {}
+func (*noopGauge) Delete(_ ...string) bool { return true }
+
+type noopHistogram struct{}
+
+func (*noopHistogram) Observe(_ float64, _ ...string) {}
+func (*noopHistogram) Delete(_ ...string) bool { return true }
+
+func newTestMetrics() *metrics {
+ c := &noopCounter{}
+ g := &noopGauge{}
+ h := &noopHistogram{}
+ return &metrics{
+ totalStarted: c,
+ totalFinished: c,
+ totalErr: c,
+ totalLatency: c,
+
+ totalMsgReceived: c,
+ totalMsgReceivedErr: c,
+ totalMsgSent: c,
+ totalMsgSentErr: c,
+
+ outOfOrderChunksReceived: c,
+ chunksBuffered: c,
+ bufferTimeouts: c,
+ largeGapsRejected: c,
+ bufferCapacityExceeded: c,
+ finishSyncErr: c,
+
+ activeSyncSessions: g,
+ reorderBuffered: g,
+
+ chunkedSyncAbortedTotal: c,
+ chunkedSyncFailedParts: c,
+ chunkedSyncTotalBytes: c,
+ chunkedSyncDurationSecs: h,
+ }
+}
+
+type fakeCounter struct {
+ total float64
+}
+
+func (f *fakeCounter) Inc(delta float64, _ ...string) { f.total += delta }
+func (*fakeCounter) Delete(_ ...string) bool { return true }
+
+type fakeGauge struct {
+ value float64
+}
+
+func (g *fakeGauge) Set(v float64, _ ...string) { g.value = v }
+func (g *fakeGauge) Add(delta float64, _ ...string) { g.value += delta }
+func (*fakeGauge) Delete(_ ...string) bool { return true }
+func newFakeGauge() meter.Gauge { return &fakeGauge{} }
+func getGaugeValue(g meter.Gauge) float64 { return
g.(*fakeGauge).value }
+
+type fakeHistogram struct {
+ count int
+}
+
+func (h *fakeHistogram) Observe(_ float64, _ ...string) { h.count++ }
+func (*fakeHistogram) Delete(_ ...string) bool { return true }
+
+func TestReleaseMetricsReleasesGauges(t *testing.T) {
+ m := &metrics{
+ activeSyncSessions: newFakeGauge(),
+ reorderBuffered: newFakeGauge(),
+ }
+
+ sess := &syncSession{
+ sessionID: "s1",
+ startTime: time.Now(),
+ metadata: &clusterv1.SyncMetadata{Topic:
"v1:stream-part-sync"},
+ chunkBuffer: &chunkBuffer{
+ chunks: map[uint32]*clusterv1.SyncPartRequest{
+ 2: {ChunkIndex: 2},
+ 3: {ChunkIndex: 3},
+ },
+ },
+ }
+
+ // simulate start increments
+ m.activeSyncSessions.Add(1, sess.metadata.Topic)
+ m.reorderBuffered.Add(2, sess.metadata.Topic)
+
+ sess.releaseMetrics(m)
+
+ require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
+ require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
+ // idempotent
+ sess.releaseMetrics(m)
+ require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
+ require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
+}
+
+func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
+ logInitErr := logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ })
+ require.NoError(t, logInitErr)
+
+ topic := "v1:stream-part-sync"
+ totalBytes := float64(10)
+
+ s := &server{
+ log: logger.GetLogger("test-server-completion-metrics"),
+ metrics: &metrics{
+ activeSyncSessions: newFakeGauge(),
+ reorderBuffered: newFakeGauge(),
+ chunkedSyncAbortedTotal: &fakeCounter{},
+ chunkedSyncFailedParts: &fakeCounter{},
+ chunkedSyncTotalBytes: &fakeCounter{},
+ chunkedSyncDurationSecs: &fakeHistogram{},
+ },
+ }
+
+ session := &syncSession{
+ sessionID: "s1",
+ startTime: time.Now().Add(-2 * time.Second),
+ metadata: &clusterv1.SyncMetadata{Topic: topic},
+ partCtx: &queue.ChunkedSyncPartContext{},
+ partsProgress: map[int]*partProgress{
+ 0: {totalBytes: 10, receivedBytes: 10, completed: true},
+ 1: {totalBytes: 10, receivedBytes: 0, completed: false},
+ },
+ totalReceived: 10,
+ chunksReceived: 2,
+ }
+
+ // Simulate start increments that would have happened on session
creation.
+ s.metrics.activeSyncSessions.Add(1, topic)
+
+ mockStream := &MockSyncPartStream{}
+ req := &clusterv1.SyncPartRequest{
+ SessionId: session.sessionID,
+ ChunkIndex: 0,
+ Content: &clusterv1.SyncPartRequest_Completion{
+ Completion: &clusterv1.SyncCompletion{},
+ },
+ }
+ handleErr := s.handleCompletion(mockStream, session, req)
+ require.NoError(t, handleErr)
+
+ require.Equal(t, totalBytes,
s.metrics.chunkedSyncTotalBytes.(*fakeCounter).total)
+ require.Equal(t, 1,
s.metrics.chunkedSyncDurationSecs.(*fakeHistogram).count)
+ require.Equal(t, float64(1),
s.metrics.chunkedSyncFailedParts.(*fakeCounter).total)
+ require.Equal(t, float64(0),
getGaugeValue(s.metrics.activeSyncSessions))
+ require.True(t, session.completed)
+}
+
+func TestCleanupPreviousSessionRecordsAborted(t *testing.T) {
+ logInitErr := logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ })
+ require.NoError(t, logInitErr)
+
+ topic := "v1:stream-part-sync"
+ s := &server{
+ log: logger.GetLogger("test-server-switch-abort"),
+ metrics: &metrics{
+ activeSyncSessions: newFakeGauge(),
+ reorderBuffered: newFakeGauge(),
+ chunkedSyncAbortedTotal: &fakeCounter{},
+ },
+ }
+
+ prev := &syncSession{
+ sessionID: "s1",
+ startTime: time.Now().Add(-time.Second),
+ metadata: &clusterv1.SyncMetadata{Topic: topic},
+ }
+ s.metrics.activeSyncSessions.Add(1, topic)
+
+ s.cleanupPreviousSession(prev)
+
+ require.True(t, prev.abortedRecorded)
+ require.Equal(t, float64(1),
s.metrics.chunkedSyncAbortedTotal.(*fakeCounter).total)
+ require.Equal(t, float64(0),
getGaugeValue(s.metrics.activeSyncSessions))
+}