This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new dccf207d Add a wait and retry to write handlers to avoid the local
metadata cache being loaded (#678)
dccf207d is described below
commit dccf207dc9a74225fd4e5ba7c2c3ad6f36e6694a
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jun 5 12:10:58 2025 +0800
Add a wait and retry to write handlers to avoid the local metadata cache
being loaded (#678)
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
banyand/liaison/grpc/measure.go | 33 ++++++++-
banyand/liaison/grpc/server.go | 4 +
banyand/liaison/grpc/stream.go | 160 +++++++++++++++++++++++++++-------------
4 files changed, 143 insertions(+), 55 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 93b79580..e819a022 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
- Replica: Replace Any with []byte Between Liaison and Data Nodes
- Replica: Support configurable replica count on Group.
- Replica: Move the TopN pre-calculation flow from the Data Node to the
Liaison Node.
+- Add a wait and retry to write handlers to avoid the local metadata cache
being loaded.
### Bug Fixes
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 2691f679..52a7b913 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -49,9 +49,10 @@ type measureService struct {
broadcaster queue.Client
topNService measure.TopNService
*discoveryService
- l *logger.Logger
- metrics *metrics
- writeTimeout time.Duration
+ l *logger.Logger
+ metrics *metrics
+ writeTimeout time.Duration
+ maxWaitDuration time.Duration
}
func (ms *measureService) setLogger(log *logger.Logger) {
@@ -132,7 +133,31 @@ func (ms *measureService)
validateWriteRequest(writeRequest *measurev1.WriteRequ
func (ms *measureService) processAndPublishRequest(ctx context.Context,
writeRequest *measurev1.WriteRequest,
publisher queue.BatchPublisher, succeedSent *[]succeedSentMessage,
measure measurev1.MeasureService_WriteServer,
) error {
- tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(),
writeRequest.GetDataPoint().GetTagFamilies())
+ // Retry with backoff when encountering errNotExist
+ var tagValues pbv1.EntityValues
+ var shardID common.ShardID
+ var err error
+
+ if ms.maxWaitDuration > 0 {
+ retryInterval := 10 * time.Millisecond
+ startTime := time.Now()
+ for {
+ tagValues, shardID, err =
ms.navigate(writeRequest.GetMetadata(),
writeRequest.GetDataPoint().GetTagFamilies())
+ if err == nil || !errors.Is(err, errNotExist) ||
time.Since(startTime) > ms.maxWaitDuration {
+ break
+ }
+
+ // Exponential backoff with jitter
+ time.Sleep(retryInterval)
+ retryInterval = time.Duration(float64(retryInterval) *
1.5)
+ if retryInterval > time.Second {
+ retryInterval = time.Second
+ }
+ }
+ } else {
+ tagValues, shardID, err =
ms.navigate(writeRequest.GetMetadata(),
writeRequest.GetDataPoint().GetTagFamilies())
+ }
+
if err != nil {
ms.l.Error().Err(err).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 7d234cfd..00c384c4 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -244,6 +244,10 @@ func (s *server) FlagSet() *run.FlagSet {
fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access
log root path")
fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout",
15*time.Second, "stream write timeout")
fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout",
15*time.Second, "measure write timeout")
+ fs.DurationVar(&s.measureSVC.maxWaitDuration,
"measure-metadata-cache-wait-duration", 0,
+ "the maximum duration to wait for metadata cache to load (for
testing purposes)")
+ fs.DurationVar(&s.streamSVC.maxWaitDuration,
"stream-metadata-cache-wait-duration", 0,
+ "the maximum duration to wait for metadata cache to load (for
testing purposes)")
return fs
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 1a4f0125..8dc9ff5f 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -46,9 +47,10 @@ type streamService struct {
pipeline queue.Client
broadcaster queue.Client
*discoveryService
- l *logger.Logger
- metrics *metrics
- writeTimeout time.Duration
+ l *logger.Logger
+ metrics *metrics
+ writeTimeout time.Duration
+ maxWaitDuration time.Duration
}
func (s *streamService) setLogger(log *logger.Logger) {
@@ -63,6 +65,80 @@ func (s *streamService) activeIngestionAccessLog(root
string) (err error) {
return nil
}
+func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest)
error {
+ if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err !=
nil {
+ s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the
element time is invalid")
+ return err
+ }
+ return nil
+}
+
+func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest)
error {
+ if writeEntity.Metadata.ModRevision > 0 {
+ streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+ if !existed {
+ return errors.New("stream schema not found")
+ }
+ if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
+ return errors.New("expired stream schema")
+ }
+ }
+ return nil
+}
+
+func (s *streamService) navigateWithRetry(writeEntity *streamv1.WriteRequest)
(tagValues pbv1.EntityValues, shardID common.ShardID, err error) {
+ if s.maxWaitDuration > 0 {
+ retryInterval := 10 * time.Millisecond
+ startTime := time.Now()
+ for {
+ tagValues, shardID, err =
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
+ if err == nil || !errors.Is(err, errNotExist) ||
time.Since(startTime) > s.maxWaitDuration {
+ return
+ }
+ time.Sleep(retryInterval)
+ retryInterval = time.Duration(float64(retryInterval) *
1.5)
+ if retryInterval > time.Second {
+ retryInterval = time.Second
+ }
+ }
+ }
+ return s.navigate(writeEntity.GetMetadata(),
writeEntity.GetElement().GetTagFamilies())
+}
+
+func (s *streamService) publishMessages(
+ ctx context.Context,
+ publisher queue.BatchPublisher,
+ writeEntity *streamv1.WriteRequest,
+ shardID common.ShardID,
+ tagValues pbv1.EntityValues,
+) ([]string, error) {
+ iwr := &streamv1.InternalWriteRequest{
+ Request: writeEntity,
+ ShardId: uint32(shardID),
+ EntityValues: tagValues[1:].Encode(),
+ }
+
+ copies, ok := s.groupRepo.copies(writeEntity.Metadata.GetGroup())
+ if !ok {
+ return nil, errors.New("failed to get group copies")
+ }
+
+ nodes := make([]string, 0, copies)
+ for i := range copies {
+ nodeID, err :=
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(),
writeEntity.GetMetadata().GetName(), uint32(shardID), i)
+ if err != nil {
+ return nil, err
+ }
+
+ message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
+ if _, err := publisher.Publish(ctx, data.TopicStreamWrite,
message); err != nil {
+ return nil, err
+ }
+ nodes = append(nodes, nodeID)
+ }
+ return nodes, nil
+}
+
func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error
{
reply := func(metadata *commonv1.Metadata, status modelv1.Status,
messageId uint64, stream streamv1.StreamService_WriteServer, logger
*logger.Logger) {
if status != modelv1.Status_STATUS_SUCCEED {
@@ -76,6 +152,7 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group,
"stream", "write")
}
}
+
s.metrics.totalStreamStarted.Inc(1, "stream", "write")
publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
start := time.Now()
@@ -104,6 +181,7 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
s.metrics.totalStreamFinished.Inc(1, "stream", "write")
s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(),
"stream", "write")
}()
+
ctx := stream.Context()
for {
select {
@@ -111,6 +189,7 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
return ctx.Err()
default:
}
+
writeEntity, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
@@ -121,70 +200,46 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
}
return err
}
+
requestCount++
s.metrics.totalStreamMsgReceived.Inc(1,
writeEntity.Metadata.Group, "stream", "write")
- if errTime :=
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
- s.l.Error().Stringer("written",
writeEntity).Err(errTime).Msg("the element time is invalid")
+
+ if err = s.validateTimestamp(writeEntity); err != nil {
reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream,
s.l)
continue
}
- if writeEntity.Metadata.ModRevision > 0 {
- streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
- if !existed {
- s.l.Error().Err(err).Stringer("written",
writeEntity).Msg("failed to stream schema not found")
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.l)
- continue
- }
- if writeEntity.Metadata.ModRevision !=
streamCache.ModRevision {
- s.l.Error().Stringer("written",
writeEntity).Msg("the stream schema is expired")
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.l)
- continue
+
+ if err = s.validateMetadata(writeEntity); err != nil {
+ status := modelv1.Status_STATUS_INTERNAL_ERROR
+ if errors.Is(err, errors.New("stream schema not
found")) {
+ status = modelv1.Status_STATUS_NOT_FOUND
+ } else if errors.Is(err, errors.New("expired stream
schema")) {
+ status = modelv1.Status_STATUS_EXPIRED_SCHEMA
}
+ s.l.Error().Err(err).Stringer("written",
writeEntity).Msg("metadata validation failed")
+ reply(writeEntity.GetMetadata(), status,
writeEntity.GetMessageId(), stream, s.l)
+ continue
}
- tagValues, shardID, err :=
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
+
+ tagValues, shardID, err := s.navigateWithRetry(writeEntity)
if err != nil {
- s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
+ s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("navigation failed")
reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
continue
}
+
if s.ingestionAccessLog != nil {
- if errAccessLog :=
s.ingestionAccessLog.Write(writeEntity); errAccessLog != nil {
- s.l.Error().Err(errAccessLog).Msg("failed to
write ingestion access log")
+ if errAL := s.ingestionAccessLog.Write(writeEntity);
errAL != nil {
+ s.l.Error().Err(errAL).Msg("failed to write
ingestion access log")
}
}
- iwr := &streamv1.InternalWriteRequest{
- Request: writeEntity,
- ShardId: uint32(shardID),
- EntityValues: tagValues[1:].Encode(),
- }
- copies, ok :=
s.groupRepo.copies(writeEntity.Metadata.GetGroup())
- if !ok {
- s.l.Error().RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to get the group copies")
+
+ nodes, err := s.publishMessages(ctx, publisher, writeEntity,
shardID, tagValues)
+ if err != nil {
+ s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("publishing failed")
reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
continue
}
- nodes := make([]string, 0, copies)
- for i := range copies {
- nodeID, errPickNode :=
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(),
writeEntity.GetMetadata().GetName(), uint32(shardID), i)
- if errPickNode != nil {
- s.l.Error().Err(errPickNode).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to pick an available node")
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
- continue
- }
- message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
- _, errWritePub := publisher.Publish(ctx,
data.TopicStreamWrite, message)
- if errWritePub != nil {
- s.l.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
- var ce *common.Error
- if errors.As(errWritePub, &ce) {
- reply(writeEntity.GetMetadata(),
ce.Status(), writeEntity.GetMessageId(), stream, s.l)
- continue
- }
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
- continue
- }
- nodes = append(nodes, nodeID)
- }
succeedSent = append(succeedSent, succeedSentMessage{
metadata: writeEntity.GetMetadata(),
@@ -255,5 +310,8 @@ func (s *streamService) Query(ctx context.Context, req
*streamv1.QueryRequest) (
}
func (s *streamService) Close() error {
- return s.ingestionAccessLog.Close()
+ if s.ingestionAccessLog != nil {
+ return s.ingestionAccessLog.Close()
+ }
+ return nil
}