This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new dccf207d Add a wait and retry to write handlers to avoid the local 
metadata cache being loaded (#678)
dccf207d is described below

commit dccf207dc9a74225fd4e5ba7c2c3ad6f36e6694a
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jun 5 12:10:58 2025 +0800

    Add a wait and retry to write handlers to avoid the local metadata cache 
being loaded (#678)
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                      |   1 +
 banyand/liaison/grpc/measure.go |  33 ++++++++-
 banyand/liaison/grpc/server.go  |   4 +
 banyand/liaison/grpc/stream.go  | 160 +++++++++++++++++++++++++++-------------
 4 files changed, 143 insertions(+), 55 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 93b79580..e819a022 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
 - Replica: Replace Any with []byte Between Liaison and Data Nodes
 - Replica: Support configurable replica count on Group.
 - Replica: Move the TopN pre-calculation flow from the Data Node to the 
Liaison Node.
+- Add a wait and retry to write handlers to avoid the local metadata cache 
being loaded.
 
 ### Bug Fixes
 
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 2691f679..52a7b913 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -49,9 +49,10 @@ type measureService struct {
        broadcaster        queue.Client
        topNService        measure.TopNService
        *discoveryService
-       l            *logger.Logger
-       metrics      *metrics
-       writeTimeout time.Duration
+       l               *logger.Logger
+       metrics         *metrics
+       writeTimeout    time.Duration
+       maxWaitDuration time.Duration
 }
 
 func (ms *measureService) setLogger(log *logger.Logger) {
@@ -132,7 +133,31 @@ func (ms *measureService) 
validateWriteRequest(writeRequest *measurev1.WriteRequ
 func (ms *measureService) processAndPublishRequest(ctx context.Context, 
writeRequest *measurev1.WriteRequest,
        publisher queue.BatchPublisher, succeedSent *[]succeedSentMessage, 
measure measurev1.MeasureService_WriteServer,
 ) error {
-       tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
+       // Retry with backoff when encountering errNotExist
+       var tagValues pbv1.EntityValues
+       var shardID common.ShardID
+       var err error
+
+       if ms.maxWaitDuration > 0 {
+               retryInterval := 10 * time.Millisecond
+               startTime := time.Now()
+               for {
+                       tagValues, shardID, err = 
ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
+                       if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > ms.maxWaitDuration {
+                               break
+                       }
+
+                       // Exponential backoff with jitter
+                       time.Sleep(retryInterval)
+                       retryInterval = time.Duration(float64(retryInterval) * 
1.5)
+                       if retryInterval > time.Second {
+                               retryInterval = time.Second
+                       }
+               }
+       } else {
+               tagValues, shardID, err = 
ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
+       }
+
        if err != nil {
                ms.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
                ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 7d234cfd..00c384c4 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -244,6 +244,10 @@ func (s *server) FlagSet() *run.FlagSet {
        fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access 
log root path")
        fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 
15*time.Second, "stream write timeout")
        fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 
15*time.Second, "measure write timeout")
+       fs.DurationVar(&s.measureSVC.maxWaitDuration, 
"measure-metadata-cache-wait-duration", 0,
+               "the maximum duration to wait for metadata cache to load (for 
testing purposes)")
+       fs.DurationVar(&s.streamSVC.maxWaitDuration, 
"stream-metadata-cache-wait-duration", 0,
+               "the maximum duration to wait for metadata cache to load (for 
testing purposes)")
        return fs
 }
 
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 1a4f0125..8dc9ff5f 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -36,6 +36,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -46,9 +47,10 @@ type streamService struct {
        pipeline           queue.Client
        broadcaster        queue.Client
        *discoveryService
-       l            *logger.Logger
-       metrics      *metrics
-       writeTimeout time.Duration
+       l               *logger.Logger
+       metrics         *metrics
+       writeTimeout    time.Duration
+       maxWaitDuration time.Duration
 }
 
 func (s *streamService) setLogger(log *logger.Logger) {
@@ -63,6 +65,80 @@ func (s *streamService) activeIngestionAccessLog(root 
string) (err error) {
        return nil
 }
 
+func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest) 
error {
+       if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err != 
nil {
+               s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the 
element time is invalid")
+               return err
+       }
+       return nil
+}
+
+func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest) 
error {
+       if writeEntity.Metadata.ModRevision > 0 {
+               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+               if !existed {
+                       return errors.New("stream schema not found")
+               }
+               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
+                       return errors.New("expired stream schema")
+               }
+       }
+       return nil
+}
+
+func (s *streamService) navigateWithRetry(writeEntity *streamv1.WriteRequest) 
(tagValues pbv1.EntityValues, shardID common.ShardID, err error) {
+       if s.maxWaitDuration > 0 {
+               retryInterval := 10 * time.Millisecond
+               startTime := time.Now()
+               for {
+                       tagValues, shardID, err = 
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
+                       if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > s.maxWaitDuration {
+                               return
+                       }
+                       time.Sleep(retryInterval)
+                       retryInterval = time.Duration(float64(retryInterval) * 
1.5)
+                       if retryInterval > time.Second {
+                               retryInterval = time.Second
+                       }
+               }
+       }
+       return s.navigate(writeEntity.GetMetadata(), 
writeEntity.GetElement().GetTagFamilies())
+}
+
+func (s *streamService) publishMessages(
+       ctx context.Context,
+       publisher queue.BatchPublisher,
+       writeEntity *streamv1.WriteRequest,
+       shardID common.ShardID,
+       tagValues pbv1.EntityValues,
+) ([]string, error) {
+       iwr := &streamv1.InternalWriteRequest{
+               Request:      writeEntity,
+               ShardId:      uint32(shardID),
+               EntityValues: tagValues[1:].Encode(),
+       }
+
+       copies, ok := s.groupRepo.copies(writeEntity.Metadata.GetGroup())
+       if !ok {
+               return nil, errors.New("failed to get group copies")
+       }
+
+       nodes := make([]string, 0, copies)
+       for i := range copies {
+               nodeID, err := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID), i)
+               if err != nil {
+                       return nil, err
+               }
+
+               message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
+               if _, err := publisher.Publish(ctx, data.TopicStreamWrite, 
message); err != nil {
+                       return nil, err
+               }
+               nodes = append(nodes, nodeID)
+       }
+       return nodes, nil
+}
+
 func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error 
{
        reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
messageId uint64, stream streamv1.StreamService_WriteServer, logger 
*logger.Logger) {
                if status != modelv1.Status_STATUS_SUCCEED {
@@ -76,6 +152,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"stream", "write")
                }
        }
+
        s.metrics.totalStreamStarted.Inc(1, "stream", "write")
        publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
        start := time.Now()
@@ -104,6 +181,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                s.metrics.totalStreamFinished.Inc(1, "stream", "write")
                s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), 
"stream", "write")
        }()
+
        ctx := stream.Context()
        for {
                select {
@@ -111,6 +189,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        return ctx.Err()
                default:
                }
+
                writeEntity, err := stream.Recv()
                if errors.Is(err, io.EOF) {
                        return nil
@@ -121,70 +200,46 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        }
                        return err
                }
+
                requestCount++
                s.metrics.totalStreamMsgReceived.Inc(1, 
writeEntity.Metadata.Group, "stream", "write")
-               if errTime := 
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
-                       s.l.Error().Stringer("written", 
writeEntity).Err(errTime).Msg("the element time is invalid")
+
+               if err = s.validateTimestamp(writeEntity); err != nil {
                        reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, 
s.l)
                        continue
                }
-               if writeEntity.Metadata.ModRevision > 0 {
-                       streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
-                       if !existed {
-                               s.l.Error().Err(err).Stringer("written", 
writeEntity).Msg("failed to stream schema not found")
-                               reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.l)
-                               continue
-                       }
-                       if writeEntity.Metadata.ModRevision != 
streamCache.ModRevision {
-                               s.l.Error().Stringer("written", 
writeEntity).Msg("the stream schema is expired")
-                               reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.l)
-                               continue
+
+               if err = s.validateMetadata(writeEntity); err != nil {
+                       status := modelv1.Status_STATUS_INTERNAL_ERROR
+                       if errors.Is(err, errors.New("stream schema not 
found")) {
+                               status = modelv1.Status_STATUS_NOT_FOUND
+                       } else if errors.Is(err, errors.New("expired stream 
schema")) {
+                               status = modelv1.Status_STATUS_EXPIRED_SCHEMA
                        }
+                       s.l.Error().Err(err).Stringer("written", 
writeEntity).Msg("metadata validation failed")
+                       reply(writeEntity.GetMetadata(), status, 
writeEntity.GetMessageId(), stream, s.l)
+                       continue
                }
-               tagValues, shardID, err := 
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
+
+               tagValues, shardID, err := s.navigateWithRetry(writeEntity)
                if err != nil {
-                       s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
+                       s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("navigation failed")
                        reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
                        continue
                }
+
                if s.ingestionAccessLog != nil {
-                       if errAccessLog := 
s.ingestionAccessLog.Write(writeEntity); errAccessLog != nil {
-                               s.l.Error().Err(errAccessLog).Msg("failed to 
write ingestion access log")
+                       if errAL := s.ingestionAccessLog.Write(writeEntity); 
errAL != nil {
+                               s.l.Error().Err(errAL).Msg("failed to write 
ingestion access log")
                        }
                }
-               iwr := &streamv1.InternalWriteRequest{
-                       Request:      writeEntity,
-                       ShardId:      uint32(shardID),
-                       EntityValues: tagValues[1:].Encode(),
-               }
-               copies, ok := 
s.groupRepo.copies(writeEntity.Metadata.GetGroup())
-               if !ok {
-                       s.l.Error().RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to get the group copies")
+
+               nodes, err := s.publishMessages(ctx, publisher, writeEntity, 
shardID, tagValues)
+               if err != nil {
+                       s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("publishing failed")
                        reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
                        continue
                }
-               nodes := make([]string, 0, copies)
-               for i := range copies {
-                       nodeID, errPickNode := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID), i)
-                       if errPickNode != nil {
-                               s.l.Error().Err(errPickNode).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to pick an available node")
-                               reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
-                               continue
-                       }
-                       message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
-                       _, errWritePub := publisher.Publish(ctx, 
data.TopicStreamWrite, message)
-                       if errWritePub != nil {
-                               s.l.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
-                               var ce *common.Error
-                               if errors.As(errWritePub, &ce) {
-                                       reply(writeEntity.GetMetadata(), 
ce.Status(), writeEntity.GetMessageId(), stream, s.l)
-                                       continue
-                               }
-                               reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
-                               continue
-                       }
-                       nodes = append(nodes, nodeID)
-               }
 
                succeedSent = append(succeedSent, succeedSentMessage{
                        metadata:  writeEntity.GetMetadata(),
@@ -255,5 +310,8 @@ func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (
 }
 
 func (s *streamService) Close() error {
-       return s.ingestionAccessLog.Close()
+       if s.ingestionAccessLog != nil {
+               return s.ingestionAccessLog.Close()
+       }
+       return nil
 }

Reply via email to