This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b5307c6e Update handoff controller to use maxTotalSizeBytes (#853)
b5307c6e is described below
commit b5307c6edeb54a131a716c1e8c1eedc1c3a10369
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Nov 20 13:25:57 2025 +0800
Update handoff controller to use maxTotalSizeBytes (#853)
* Update handoff controller to use maxTotalSizeBytes and improve error
handling for handoff size calculation
* Enhance handoff controller logic to ensure handoffMaxSizePercent is
greater than zero before calculating max handoff size
---
banyand/trace/handoff_controller.go | 4 ++--
banyand/trace/svc_liaison.go | 22 ++++++++++++++++------
2 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/banyand/trace/handoff_controller.go
b/banyand/trace/handoff_controller.go
index 2b4d5706..c3d1cfea 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -78,7 +78,7 @@ type queueClient interface {
// newHandoffController creates a new handoff controller.
func newHandoffController(fileSystem fs.FileSystem, root string, tire2Client
queueClient,
- dataNodeList []string, maxSize int, l *logger.Logger,
+ dataNodeList []string, maxTotalSizeBytes uint64, l *logger.Logger,
resolveShardAssignments func(group string, shardID uint32) ([]string,
error),
) (*handoffController, error) {
if fileSystem == nil {
@@ -109,7 +109,7 @@ func newHandoffController(fileSystem fs.FileSystem, root
string, tire2Client que
inFlightSends: make(map[string]map[uint64]struct{}),
replayBatchSize: 10,
replayPollInterval: 1 * time.Second,
- maxTotalSizeBytes: uint64(maxSize),
+ maxTotalSizeBytes: maxTotalSizeBytes,
currentTotalSize: 0,
resolveShardAssignments: resolveShardAssignments,
}
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index bfb1204e..2d8714bd 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -26,6 +26,8 @@ import (
"sort"
"time"
+ "github.com/dustin/go-humanize"
+
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -171,17 +173,25 @@ func (l *liaison) PreRun(ctx context.Context) error {
// Initialize handoff controller if data nodes are configured
l.l.Info().Strs("dataNodeList", l.dataNodeList).Int("maxSizePercent",
l.handoffMaxSizePercent).
Msg("handoff configuration")
- if len(l.dataNodeList) > 0 && l.option.tire2Client != nil {
+ if len(l.dataNodeList) > 0 && l.option.tire2Client != nil &&
l.handoffMaxSizePercent > 0 {
// Calculate max handoff size based on percentage of disk space
// Formula: totalDisk * maxDiskUsagePercent *
handoffMaxSizePercent / 10000
// Example: 100GB disk, 95% max usage, 10% handoff = 100 * 95 *
10 / 10000 = 9.5GB
- maxSize := 0
+ var maxSizeBytes uint64
if l.handoffMaxSizePercent > 0 {
totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
// Divide after each multiplication to avoid overflow
with large disk capacities
- maxSizeBytes := totalSpace *
uint64(l.maxDiskUsagePercent) / 100 * uint64(l.handoffMaxSizePercent) / 100
- maxSize = int(maxSizeBytes / 1024 / 1024)
+ maxSizeBytes = totalSpace *
uint64(l.maxDiskUsagePercent) / 100 * uint64(l.handoffMaxSizePercent) / 100
+ }
+ if maxSizeBytes == 0 {
+ return fmt.Errorf("handoff max size is 0 because
handoff-max-size-percent is 0 or not set. " +
+ "Set BYDB_HANDOFF_MAX_SIZE_PERCENT environment
variable or --handoff-max-size-percent flag to enable handoff storage limit")
}
+ l.l.Info().
+ Str("maxSizeBytes", humanize.Bytes(maxSizeBytes)).
+ Int("maxSizePercent", l.handoffMaxSizePercent).
+ Int("diskUsagePercent", l.maxDiskUsagePercent).
+ Msg("handoff max size")
// nolint:contextcheck
resolveAssignments := func(group string, shardID uint32)
([]string, error) {
@@ -218,13 +228,13 @@ func (l *liaison) PreRun(ctx context.Context) error {
var err error
// nolint:contextcheck
- l.handoffCtrl, err = newHandoffController(l.lfs, l.dataPath,
l.option.tire2Client, l.dataNodeList, maxSize, l.l, resolveAssignments)
+ l.handoffCtrl, err = newHandoffController(l.lfs, l.dataPath,
l.option.tire2Client, l.dataNodeList, maxSizeBytes, l.l, resolveAssignments)
if err != nil {
return err
}
l.l.Info().
Int("dataNodes", len(l.dataNodeList)).
- Int("maxSize", maxSize).
+ Uint64("maxSize", maxSizeBytes).
Int("maxSizePercent", l.handoffMaxSizePercent).
Int("diskUsagePercent", l.maxDiskUsagePercent).
Msg("handoff controller initialized")