This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch metadata-cache in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 1d5fc28a30b221f18b4071d48e640133401e91d7 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Jun 5 11:53:00 2025 +0800 Add a wait and retry to write handlers to avoid the local metadata cache being loaded Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + banyand/liaison/grpc/measure.go | 33 ++++++++- banyand/liaison/grpc/server.go | 4 + banyand/liaison/grpc/stream.go | 160 +++++++++++++++++++++++++++------------- 4 files changed, 143 insertions(+), 55 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 93b79580..e819a022 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,6 +13,7 @@ Release Notes. - Replica: Replace Any with []byte Between Liaison and Data Nodes - Replica: Support configurable replica count on Group. - Replica: Move the TopN pre-calculation flow from the Data Node to the Liaison Node. +- Add a wait and retry to write handlers to avoid the local metadata cache being loaded. ### Bug Fixes diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 2691f679..52a7b913 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -49,9 +49,10 @@ type measureService struct { broadcaster queue.Client topNService measure.TopNService *discoveryService - l *logger.Logger - metrics *metrics - writeTimeout time.Duration + l *logger.Logger + metrics *metrics + writeTimeout time.Duration + maxWaitDuration time.Duration } func (ms *measureService) setLogger(log *logger.Logger) { @@ -132,7 +133,31 @@ func (ms *measureService) validateWriteRequest(writeRequest *measurev1.WriteRequ func (ms *measureService) processAndPublishRequest(ctx context.Context, writeRequest *measurev1.WriteRequest, publisher queue.BatchPublisher, succeedSent *[]succeedSentMessage, measure measurev1.MeasureService_WriteServer, ) error { - tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies()) + // Retry with backoff when encountering errNotExist + var tagValues pbv1.EntityValues + var shardID common.ShardID + var err error + + if ms.maxWaitDuration > 0 { + retryInterval := 10 * time.Millisecond + startTime := time.Now() + for { + tagValues, shardID, err = ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies()) + if err == nil || !errors.Is(err, errNotExist) || time.Since(startTime) > ms.maxWaitDuration { + break + } + + // Exponential backoff with jitter + time.Sleep(retryInterval) + retryInterval = time.Duration(float64(retryInterval) * 1.5) + if retryInterval > time.Second { + retryInterval = time.Second + } + } + } else { + tagValues, shardID, err = ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies()) + } + if err != nil { ms.l.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to navigate to the write target") ms.sendReply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure) diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 7d234cfd..00c384c4 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -244,6 +244,10 @@ func (s *server) FlagSet() *run.FlagSet { fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "stream write timeout") fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "measure write timeout") + fs.DurationVar(&s.measureSVC.maxWaitDuration, "measure-metadata-cache-wait-duration", 0, + "the maximum duration to wait for metadata cache to load (for testing purposes)") + fs.DurationVar(&s.streamSVC.maxWaitDuration, "stream-metadata-cache-wait-duration", 0, + "the maximum duration to wait for metadata cache to load (for testing purposes)") return fs } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 1a4f0125..8dc9ff5f 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -36,6 +36,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -46,9 +47,10 @@ type streamService struct { pipeline queue.Client broadcaster queue.Client *discoveryService - l *logger.Logger - metrics *metrics - writeTimeout time.Duration + l *logger.Logger + metrics *metrics + writeTimeout time.Duration + maxWaitDuration time.Duration } func (s *streamService) setLogger(log *logger.Logger) { @@ -63,6 +65,80 @@ func (s *streamService) activeIngestionAccessLog(root string) (err error) { return nil } +func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest) error { + if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err != nil { + s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the element time is invalid") + return err + } + return nil +} + +func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest) error { + if writeEntity.Metadata.ModRevision > 0 { + streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata())) + if !existed { + return errors.New("stream schema not found") + } + if writeEntity.Metadata.ModRevision != streamCache.ModRevision { + return errors.New("expired stream schema") + } + } + return nil +} + +func (s *streamService) navigateWithRetry(writeEntity *streamv1.WriteRequest) (tagValues pbv1.EntityValues, shardID common.ShardID, err error) { + if s.maxWaitDuration > 0 { + retryInterval := 10 * time.Millisecond + startTime := time.Now() + for { + tagValues, shardID, err = s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies()) + if err == nil || !errors.Is(err, errNotExist) || time.Since(startTime) > s.maxWaitDuration { + return + } + time.Sleep(retryInterval) + retryInterval = time.Duration(float64(retryInterval) * 1.5) + if retryInterval > time.Second { + retryInterval = time.Second + } + } + } + return s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies()) +} + +func (s *streamService) publishMessages( + ctx context.Context, + publisher queue.BatchPublisher, + writeEntity *streamv1.WriteRequest, + shardID common.ShardID, + tagValues pbv1.EntityValues, +) ([]string, error) { + iwr := &streamv1.InternalWriteRequest{ + Request: writeEntity, + ShardId: uint32(shardID), + EntityValues: tagValues[1:].Encode(), + } + + copies, ok := s.groupRepo.copies(writeEntity.Metadata.GetGroup()) + if !ok { + return nil, errors.New("failed to get group copies") + } + + nodes := make([]string, 0, copies) + for i := range copies { + nodeID, err := s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), writeEntity.GetMetadata().GetName(), uint32(shardID), i) + if err != nil { + return nil, err + } + + message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) + if _, err := publisher.Publish(ctx, data.TopicStreamWrite, message); err != nil { + return nil, err + } + nodes = append(nodes, nodeID) + } + return nodes, nil +} + func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, stream streamv1.StreamService_WriteServer, logger *logger.Logger) { if status != modelv1.Status_STATUS_SUCCEED { @@ -76,6 +152,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "stream", "write") } } + s.metrics.totalStreamStarted.Inc(1, "stream", "write") publisher := s.pipeline.NewBatchPublisher(s.writeTimeout) start := time.Now() @@ -104,6 +181,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { s.metrics.totalStreamFinished.Inc(1, "stream", "write") s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), "stream", "write") }() + ctx := stream.Context() for { select { @@ -111,6 +189,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { return ctx.Err() default: } + writeEntity, err := stream.Recv() if errors.Is(err, io.EOF) { return nil @@ -121,70 +200,46 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { } return err } + requestCount++ s.metrics.totalStreamMsgReceived.Inc(1, writeEntity.Metadata.Group, "stream", "write") - if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil { - s.l.Error().Stringer("written", writeEntity).Err(errTime).Msg("the element time is invalid") + + if err = s.validateTimestamp(writeEntity); err != nil { reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, s.l) continue } - if writeEntity.Metadata.ModRevision > 0 { - streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata())) - if !existed { - s.l.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found") - reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.l) - continue - } - if writeEntity.Metadata.ModRevision != streamCache.ModRevision { - s.l.Error().Stringer("written", writeEntity).Msg("the stream schema is expired") - reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.l) - continue + + if err = s.validateMetadata(writeEntity); err != nil { + status := modelv1.Status_STATUS_INTERNAL_ERROR + if errors.Is(err, errors.New("stream schema not found")) { + status = modelv1.Status_STATUS_NOT_FOUND + } else if errors.Is(err, errors.New("expired stream schema")) { + status = modelv1.Status_STATUS_EXPIRED_SCHEMA } + s.l.Error().Err(err).Stringer("written", writeEntity).Msg("metadata validation failed") + reply(writeEntity.GetMetadata(), status, writeEntity.GetMessageId(), stream, s.l) + continue } - tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies()) + + tagValues, shardID, err := s.navigateWithRetry(writeEntity) if err != nil { - s.l.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to navigate to the write target") + s.l.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("navigation failed") reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l) continue } + if s.ingestionAccessLog != nil { - if errAccessLog := s.ingestionAccessLog.Write(writeEntity); errAccessLog != nil { - s.l.Error().Err(errAccessLog).Msg("failed to write ingestion access log") + if errAL := s.ingestionAccessLog.Write(writeEntity); errAL != nil { + s.l.Error().Err(errAL).Msg("failed to write ingestion access log") } } - iwr := &streamv1.InternalWriteRequest{ - Request: writeEntity, - ShardId: uint32(shardID), - EntityValues: tagValues[1:].Encode(), - } - copies, ok := s.groupRepo.copies(writeEntity.Metadata.GetGroup()) - if !ok { - s.l.Error().RawJSON("written", logger.Proto(writeEntity)).Msg("failed to get the group copies") + + nodes, err := s.publishMessages(ctx, publisher, writeEntity, shardID, tagValues) + if err != nil { + s.l.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("publishing failed") reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l) continue } - nodes := make([]string, 0, copies) - for i := range copies { - nodeID, errPickNode := s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), writeEntity.GetMetadata().GetName(), uint32(shardID), i) - if errPickNode != nil { - s.l.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to pick an available node") - reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l) - continue - } - message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) - _, errWritePub := publisher.Publish(ctx, data.TopicStreamWrite, message) - if errWritePub != nil { - s.l.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message") - var ce *common.Error - if errors.As(errWritePub, &ce) { - reply(writeEntity.GetMetadata(), ce.Status(), writeEntity.GetMessageId(), stream, s.l) - continue - } - reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l) - continue - } - nodes = append(nodes, nodeID) - } succeedSent = append(succeedSent, succeedSentMessage{ metadata: writeEntity.GetMetadata(), @@ -255,5 +310,8 @@ func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) ( } func (s *streamService) Close() error { - return s.ingestionAccessLog.Close() + if s.ingestionAccessLog != nil { + return s.ingestionAccessLog.Close() + } + return nil }
