This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 523f0066 Fix the wrong detention setting of each measure/stream/trace
(#865)
523f0066 is described below
commit 523f006661799b60a8cab5eb62dfe0b228fc6883
Author: mrproliu <[email protected]>
AuthorDate: Wed Nov 26 19:06:49 2025 +0800
Fix the wrong detention setting of each measure/stream/trace (#865)
---
CHANGES.md | 4 +
banyand/measure/disable_retention_test.go | 242 ++++++++++++++++++++++++++++++
banyand/measure/metadata.go | 10 +-
banyand/stream/disable_retention_test.go | 241 +++++++++++++++++++++++++++++
banyand/stream/metadata.go | 10 +-
banyand/trace/disable_retention_test.go | 241 +++++++++++++++++++++++++++++
banyand/trace/metadata.go | 10 +-
7 files changed, 755 insertions(+), 3 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 4877d210..19d75419 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,10 @@ Release Notes.
- Remove Bloom filter for dictionary-encoded tags.
+### Bug Fixes
+
+- Fix the wrong retention setting of each measure/stream/trace.
+
## 0.9.0
### Features
diff --git a/banyand/measure/disable_retention_test.go
b/banyand/measure/disable_retention_test.go
new file mode 100644
index 00000000..8c3db02c
--- /dev/null
+++ b/banyand/measure/disable_retention_test.go
@@ -0,0 +1,242 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "os"
+ "path/filepath"
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestDisableRetentionByStage(t *testing.T) {
+ tempDir := t.TempDir()
+ l := logger.GetLogger("test")
+
+ tests := []struct {
+ nodeLabels map[string]string
+ name string
+ description string
+ stages []*commonv1.LifecycleStage
+ wantDisabled bool
+ }{
+ {
+ name: "two stages - node in first stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 4,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "hot",
+ },
+ wantDisabled: true,
+ description: "When node is in first stage of two
stages, retention should be disabled",
+ },
+ {
+ name: "two stages - node in last stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 4,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "cold",
+ },
+ wantDisabled: false,
+ description: "When node is in last stage of two
stages, retention should be enabled",
+ },
+ {
+ name: "three stages - node in warm stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "warm",
+ NodeSelector: "role=warm",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 3,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 2,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "warm",
+ },
+ wantDisabled: true,
+ description: "When node is in middle stage of three
stages, retention should be disabled",
+ },
+ {
+ name: "three stages - node in cold stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "warm",
+ NodeSelector: "role=warm",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 3,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 2,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "cold",
+ },
+ wantDisabled: false,
+ description: "When node is in last stage of three
stages, retention should be enabled",
+ },
+ {
+ name: "single stage - node in only stage",
+ stages: []*commonv1.LifecycleStage{},
+ nodeLabels: map[string]string{
+ "role": "hot",
+ },
+ wantDisabled: false,
+ description: "When node is in the only stage,
retention should be enabled",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a unique subdirectory for this test
+ testDir := filepath.Join(tempDir, tt.name)
+ err := os.MkdirAll(testDir, 0o755)
+ require.NoError(t, err)
+
+ // Create supplier with node labels
+ omr := observability.NewBypassRegistry()
+ pm := protector.NewMemory(omr)
+
+ s := &supplier{
+ path: testDir,
+ metadata: nil, // Not needed for this test
+ c: nil, // Not needed for this test
+ option: option{},
+ omr: omr,
+ pm: pm,
+ l: l,
+ schemaRepo: nil,
+ nodeLabels: tt.nodeLabels,
+ }
+
+ // Create group schema with stages
+ groupSchema := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: "test-group-" + tt.name,
+ },
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ Stages: tt.stages,
+ },
+ }
+
+ // Call OpenDB
+ db, err := s.OpenDB(groupSchema)
+ require.NoError(t, err, "OpenDB should not return
error")
+ require.NotNil(t, db, "DB should not be nil")
+ defer func() {
+ if db != nil {
+ _ = db.Close()
+ }
+ }()
+
+ // Verify DisableRetention setting using reflection
+ // Since db is a generic type and disableRetention is
private,
+ // we need to use reflection to access the field
+ dbValue := reflect.ValueOf(db)
+ if dbValue.Kind() == reflect.Ptr {
+ dbValue = dbValue.Elem()
+ }
+
+ disableRetentionField :=
dbValue.FieldByName("disableRetention")
+ require.True(t, disableRetentionField.IsValid(),
"disableRetention field should exist")
+
+ actualDisabled := disableRetentionField.Bool()
+ assert.Equal(t, tt.wantDisabled, actualDisabled,
tt.description)
+ })
+ }
+}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 59ec4420..b9da0e7a 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -494,9 +494,11 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
ttl := ro.Ttl
segInterval := ro.SegmentInterval
segmentIdleTimeout := time.Duration(0)
+ disableRetention := false
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
var ttlNum uint32
- for _, st := range ro.Stages {
+ foundMatched := false
+ for i, st := range ro.Stages {
if st.Ttl.Unit != ro.Ttl.Unit {
return nil, fmt.Errorf("ttl unit %s is not
consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit)
}
@@ -508,14 +510,19 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
if !selector.Matches(s.nodeLabels) {
continue
}
+ foundMatched = true
ttl.Num += ttlNum
shardNum = st.ShardNum
segInterval = st.SegmentInterval
if st.Close {
segmentIdleTimeout = 5 * time.Minute
}
+ disableRetention = i+1 < len(ro.Stages)
break
}
+ if !foundMatched {
+ disableRetention = true
+ }
}
group := groupSchema.Metadata.Name
opts := storage.TSDBOpts[*tsTable, option]{
@@ -530,6 +537,7 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
SeriesIndexCacheMaxBytes:
int(s.option.seriesCacheMaxSize),
StorageMetricsFactory: factory,
SegmentIdleTimeout: segmentIdleTimeout,
+ DisableRetention: disableRetention,
MemoryLimit: s.pm.GetLimit(),
}
return storage.OpenTSDB(
diff --git a/banyand/stream/disable_retention_test.go
b/banyand/stream/disable_retention_test.go
new file mode 100644
index 00000000..17e5b363
--- /dev/null
+++ b/banyand/stream/disable_retention_test.go
@@ -0,0 +1,241 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "os"
+ "path/filepath"
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestDisableRetentionByStage(t *testing.T) {
+ tempDir := t.TempDir()
+ l := logger.GetLogger("test-stream")
+
+ tests := []struct {
+ nodeLabels map[string]string
+ name string
+ description string
+ stages []*commonv1.LifecycleStage
+ wantDisabled bool
+ }{
+ {
+ name: "two stages - node in first stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 4,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "hot",
+ },
+ wantDisabled: true,
+ description: "When node is in first stage of two
stages, retention should be disabled",
+ },
+ {
+ name: "two stages - node in last stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 4,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "cold",
+ },
+ wantDisabled: false,
+ description: "When node is in last stage of two
stages, retention should be enabled",
+ },
+ {
+ name: "three stages - node in warm stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "warm",
+ NodeSelector: "role=warm",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 3,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 2,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "warm",
+ },
+ wantDisabled: true,
+ description: "When node is in middle stage of three
stages, retention should be disabled",
+ },
+ {
+ name: "three stages - node in cold stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "warm",
+ NodeSelector: "role=warm",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 3,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 2,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "cold",
+ },
+ wantDisabled: false,
+ description: "When node is in last stage of three
stages, retention should be enabled",
+ },
+ {
+ name: "single stage - node in only stage",
+ stages: []*commonv1.LifecycleStage{},
+ nodeLabels: map[string]string{
+ "role": "hot",
+ },
+ wantDisabled: false,
+ description: "When node is in the only stage,
retention should be enabled",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a unique subdirectory for this test
+ testDir := filepath.Join(tempDir, tt.name)
+ err := os.MkdirAll(testDir, 0o755)
+ require.NoError(t, err)
+
+ // Create supplier with node labels
+ omr := observability.NewBypassRegistry()
+ pm := protector.NewMemory(omr)
+
+ s := &supplier{
+ path: testDir,
+ metadata: nil, // Not needed for this test
+ option: option{},
+ omr: omr,
+ pm: pm,
+ l: l,
+ schemaRepo: nil,
+ nodeLabels: tt.nodeLabels,
+ }
+
+ // Create group schema with stages
+ groupSchema := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: "test-group-" + tt.name,
+ },
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ Stages: tt.stages,
+ },
+ }
+
+ // Call OpenDB
+ db, err := s.OpenDB(groupSchema)
+ require.NoError(t, err, "OpenDB should not return
error")
+ require.NotNil(t, db, "DB should not be nil")
+ defer func() {
+ if db != nil {
+ _ = db.Close()
+ }
+ }()
+
+ // Verify DisableRetention setting using reflection
+ // Since db is a generic type and disableRetention is
private,
+ // we need to use reflection to access the field
+ dbValue := reflect.ValueOf(db)
+ if dbValue.Kind() == reflect.Ptr {
+ dbValue = dbValue.Elem()
+ }
+
+ disableRetentionField :=
dbValue.FieldByName("disableRetention")
+ require.True(t, disableRetentionField.IsValid(),
"disableRetention field should exist")
+
+ actualDisabled := disableRetentionField.Bool()
+ assert.Equal(t, tt.wantDisabled, actualDisabled,
tt.description)
+ })
+ }
+}
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 3d6e529e..849498a9 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -317,9 +317,11 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
ttl := ro.Ttl
segInterval := ro.SegmentInterval
segmentIdleTimeout := time.Duration(0)
+ disableRetention := false
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
var ttlNum uint32
- for _, st := range ro.Stages {
+ foundMatched := false
+ for i, st := range ro.Stages {
if st.Ttl.Unit != ro.Ttl.Unit {
return nil, fmt.Errorf("ttl unit %s is not
consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit)
}
@@ -331,14 +333,19 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
if !selector.Matches(s.nodeLabels) {
continue
}
+ foundMatched = true
ttl.Num += ttlNum
shardNum = st.ShardNum
segInterval = st.SegmentInterval
if st.Close {
segmentIdleTimeout = 5 * time.Minute
}
+ disableRetention = i+1 < len(ro.Stages)
break
}
+ if !foundMatched {
+ disableRetention = true
+ }
}
group := groupSchema.Metadata.Name
opts := storage.TSDBOpts[*tsTable, option]{
@@ -353,6 +360,7 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
SeriesIndexCacheMaxBytes:
int(s.option.seriesCacheMaxSize),
StorageMetricsFactory:
s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
p.DBLabelValues()))),
SegmentIdleTimeout: segmentIdleTimeout,
+ DisableRetention: disableRetention,
MemoryLimit: s.pm.GetLimit(),
}
return storage.OpenTSDB(
diff --git a/banyand/trace/disable_retention_test.go
b/banyand/trace/disable_retention_test.go
new file mode 100644
index 00000000..032e4648
--- /dev/null
+++ b/banyand/trace/disable_retention_test.go
@@ -0,0 +1,241 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "os"
+ "path/filepath"
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestDisableRetentionByStage(t *testing.T) {
+ tempDir := t.TempDir()
+ l := logger.GetLogger("test-trace")
+
+ tests := []struct {
+ nodeLabels map[string]string
+ name string
+ description string
+ stages []*commonv1.LifecycleStage
+ wantDisabled bool
+ }{
+ {
+ name: "two stages - node in first stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 4,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "hot",
+ },
+ wantDisabled: true,
+ description: "When node is in first stage of two
stages, retention should be disabled",
+ },
+ {
+ name: "two stages - node in last stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 4,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "cold",
+ },
+ wantDisabled: false,
+ description: "When node is in last stage of two
stages, retention should be enabled",
+ },
+ {
+ name: "three stages - node in warm stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "warm",
+ NodeSelector: "role=warm",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 3,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 2,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "warm",
+ },
+ wantDisabled: true,
+ description: "When node is in middle stage of three
stages, retention should be disabled",
+ },
+ {
+ name: "three stages - node in cold stage",
+ stages: []*commonv1.LifecycleStage{
+ {
+ Name: "warm",
+ NodeSelector: "role=warm",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 3,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ {
+ Name: "cold",
+ NodeSelector: "role=cold",
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 2,
+ },
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ ShardNum: 2,
+ },
+ },
+ nodeLabels: map[string]string{
+ "role": "cold",
+ },
+ wantDisabled: false,
+ description: "When node is in last stage of three
stages, retention should be enabled",
+ },
+ {
+ name: "single stage - node in only stage",
+ stages: []*commonv1.LifecycleStage{},
+ nodeLabels: map[string]string{
+ "role": "hot",
+ },
+ wantDisabled: false,
+ description: "When node is in the only stage,
retention should be enabled",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a unique subdirectory for this test
+ testDir := filepath.Join(tempDir, tt.name)
+ err := os.MkdirAll(testDir, 0o755)
+ require.NoError(t, err)
+
+ // Create supplier with node labels
+ omr := observability.NewBypassRegistry()
+ pm := protector.NewMemory(omr)
+
+ s := &supplier{
+ path: testDir,
+ metadata: nil, // Not needed for this test
+ option: option{},
+ omr: omr,
+ pm: pm,
+ l: l,
+ schemaRepo: nil,
+ nodeLabels: tt.nodeLabels,
+ }
+
+ // Create group schema with stages
+ groupSchema := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: "test-group-" + tt.name,
+ },
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ Stages: tt.stages,
+ },
+ }
+
+ // Call OpenDB
+ db, err := s.OpenDB(groupSchema)
+ require.NoError(t, err, "OpenDB should not return
error")
+ require.NotNil(t, db, "DB should not be nil")
+ defer func() {
+ if db != nil {
+ _ = db.Close()
+ }
+ }()
+
+ // Verify DisableRetention setting using reflection
+ // Since db is a generic type and disableRetention is
private,
+ // we need to use reflection to access the field
+ dbValue := reflect.ValueOf(db)
+ if dbValue.Kind() == reflect.Ptr {
+ dbValue = dbValue.Elem()
+ }
+
+ disableRetentionField :=
dbValue.FieldByName("disableRetention")
+ require.True(t, disableRetentionField.IsValid(),
"disableRetention field should exist")
+
+ actualDisabled := disableRetentionField.Bool()
+ assert.Equal(t, tt.wantDisabled, actualDisabled,
tt.description)
+ })
+ }
+}
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index cf435393..82040fb5 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -322,9 +322,11 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
ttl := ro.Ttl
segInterval := ro.SegmentInterval
segmentIdleTimeout := time.Duration(0)
+ disableRetention := false
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
var ttlNum uint32
- for _, st := range ro.Stages {
+ foundMatched := false
+ for i, st := range ro.Stages {
if st.Ttl.Unit != ro.Ttl.Unit {
return nil, fmt.Errorf("ttl unit %s is not
consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit)
}
@@ -336,14 +338,19 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
if !selector.Matches(s.nodeLabels) {
continue
}
+ foundMatched = true
ttl.Num += ttlNum
shardNum = st.ShardNum
segInterval = st.SegmentInterval
if st.Close {
segmentIdleTimeout = 5 * time.Minute
}
+ disableRetention = i+1 < len(ro.Stages)
break
}
+ if !foundMatched {
+ disableRetention = true
+ }
}
group := groupSchema.Metadata.Name
opts := storage.TSDBOpts[*tsTable, option]{
@@ -358,6 +365,7 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
SeriesIndexCacheMaxBytes:
int(s.option.seriesCacheMaxSize),
StorageMetricsFactory:
s.omr.With(traceScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
p.DBLabelValues()))),
SegmentIdleTimeout: segmentIdleTimeout,
+ DisableRetention: disableRetention,
MemoryLimit: s.pm.GetLimit(),
}
return storage.OpenTSDB(