This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e189e0512 Validate ShardingKey against Entity tags (#1069)
e189e0512 is described below
commit e189e0512441ab3b876c00d915f715c807a7689b
Author: Ali Yasir Naç <[email protected]>
AuthorDate: Mon May 11 08:26:05 2026 +0300
Validate ShardingKey against Entity tags (#1069)
* Refactor: Validate ShardingKey against Entity tags
* Update CHANGES.md
* Refactor sharding key definition for measures
* Update validate_test.go
* Update tsdb_test.go
* Fix measure validation to strictly enforce ShardingKey as a sequential
prefix of Entity tags.
* updated validation
* feat: make sharding key prefix validation advisory with warnings instead
of an error
* refactor: replace ShardingKey prefix validation with relative order
subset validation
* feat: log warning when measure sharding key is not a subset of entity
tags during registration and update
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 2 +-
api/validate/validate.go | 32 ++++
api/validate/validate_test.go | 163 +++++++++++++++++++++
banyand/measure/metadata.go | 3 +
banyand/metadata/schema/property/client.go | 6 +
.../measures/service_instance_cpm_minute.json | 11 +-
.../service_instance_cpm_minute_updated.json | 11 +-
.../service_instance_endpoint_cpm_minute.json | 11 +-
8 files changed, 214 insertions(+), 25 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index c7dad3188..c714597ea 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,7 +5,7 @@ Release Notes.
## 0.11.0
### Features
-
+- Add validation to ensure Measure's ShardingKey contains all Entity tags to
guarantee entity locality.
- Organize access logs under a dedicated "accesslog" subdirectory to improve
log organization and separation from other application data.
- Collect BanyanDB data on e2e test failure for CI debugging.
- Add log query e2e test.
diff --git a/api/validate/validate.go b/api/validate/validate.go
index 946c99061..4aa4f1342 100644
--- a/api/validate/validate.go
+++ b/api/validate/validate.go
@@ -175,9 +175,41 @@ func Measure(measure *databasev1.Measure) error {
if measure.IndexMode && len(measure.Fields) > 0 {
return errors.New("index mode is enabled, but fields are not
empty")
}
+
return tagFamily(measure.TagFamilies)
}
+// CheckShardingKeySubset checks whether every ShardingKey tag exists in
Entity tags
+// and the shared tags appear in the same relative order.
+func CheckShardingKeySubset(measure *databasev1.Measure) error {
+ if measure == nil || measure.Entity == nil || measure.ShardingKey ==
nil || len(measure.ShardingKey.TagNames) == 0 {
+ return nil
+ }
+ // A single entity tag may represent a composite identifier, e.g. OAP's
entity_id,
+ // which can already encode the sharding key fields such as service_id.
+ // In that case, literal tag-name subset validation would produce false
positives.
+ if len(measure.Entity.TagNames) == 1 {
+ return nil
+ }
+ entityIndex := make(map[string]int, len(measure.Entity.TagNames))
+ for idx, tag := range measure.Entity.TagNames {
+ entityIndex[tag] = idx
+ }
+ prevPos := -1
+ for _, shardTag := range measure.ShardingKey.TagNames {
+ pos, exists := entityIndex[shardTag]
+ if !exists {
+ return fmt.Errorf("ShardingKey tag %q is not present in
Entity tags %v", shardTag, measure.Entity.TagNames)
+ }
+ if pos <= prevPos {
+ return fmt.Errorf("ShardingKey %v is not in the same
relative order as Entity tags %v",
+ measure.ShardingKey.TagNames,
measure.Entity.TagNames)
+ }
+ prevPos = pos
+ }
+ return nil
+}
+
// Trace validates the provided Trace object.
// It checks for nil values, empty strings, and unspecified enum values.
func Trace(trace *databasev1.Trace) error {
diff --git a/api/validate/validate_test.go b/api/validate/validate_test.go
new file mode 100644
index 000000000..7ae465daa
--- /dev/null
+++ b/api/validate/validate_test.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package validate
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+func TestMeasureShardingKeyNil(t *testing.T) {
+ measure := &databasev1.Measure{
+ Metadata: &commonv1.Metadata{
+ Name: "test_measure",
+ Group: "test_group",
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"service_id"},
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: "default",
+ Tags: []*databasev1.TagSpec{
+ {Name: "service_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ },
+ },
+ },
+ }
+ err := Measure(measure)
+ assert.NoError(t, err)
+}
+
+func TestMeasurePassesWithNonPrefixShardingKey(t *testing.T) {
+ measure := &databasev1.Measure{
+ Metadata: &commonv1.Metadata{Name: "endpoint_cpm", Group:
"sw_metric"},
+ Entity: &databasev1.Entity{TagNames: []string{"entity_id"}},
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: "default",
+ Tags: []*databasev1.TagSpec{
+ {Name: "entity_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "service_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ },
+ },
+ },
+ ShardingKey: &databasev1.ShardingKey{TagNames:
[]string{"service_id"}},
+ }
+ err := Measure(measure)
+ assert.NoError(t, err, "Measure() must not reject a non-prefix sharding
key")
+}
+
+func TestCheckShardingKeySubset(t *testing.T) {
+ tests := []struct {
+ name string
+ errContains string
+ entity []string
+ shardingKey []string
+ wantErr bool
+ }{
+ {
+ name: "single entity tag — composite-id pattern,
always skip",
+ entity: []string{"entity_id"},
+ shardingKey: []string{"service_id"},
+ wantErr: false,
+ },
+ {
+ name: "valid subset, same order",
+ entity: []string{"service_id", "instance_id",
"endpoint_id"},
+ shardingKey: []string{"service_id", "endpoint_id"},
+ wantErr: false,
+ },
+ {
+ name: "valid subset, identical to entity",
+ entity: []string{"service_id", "instance_id"},
+ shardingKey: []string{"service_id", "instance_id"},
+ wantErr: false,
+ },
+ {
+ name: "valid subset, single sharding key tag in
multi-entity",
+ entity: []string{"service_id", "instance_id"},
+ shardingKey: []string{"instance_id"},
+ wantErr: false,
+ },
+ {
+ name: "invalid — sharding key tag not in entity",
+ entity: []string{"service_id", "instance_id"},
+ shardingKey: []string{"endpoint_id"},
+ wantErr: true,
+ errContains: "is not present in Entity tags",
+ },
+ {
+ name: "invalid — superset of entity",
+ entity: []string{"service_id"},
+ shardingKey: []string{"service_id", "instance_id"},
+ wantErr: false, // single entity tag — skip
+ },
+ {
+ name: "invalid — superset of multi-entity",
+ entity: []string{"service_id", "instance_id"},
+ shardingKey: []string{"service_id", "instance_id",
"endpoint_id"},
+ wantErr: true,
+ errContains: "is not present in Entity tags",
+ },
+ {
+ name: "invalid — wrong relative order",
+ entity: []string{"service_id", "instance_id"},
+ shardingKey: []string{"instance_id", "service_id"},
+ wantErr: true,
+ errContains: "is not in the same relative order",
+ },
+ {
+ name: "nil sharding key",
+ entity: []string{"service_id"},
+ shardingKey: nil,
+ wantErr: false,
+ },
+ {
+ name: "empty sharding key",
+ entity: []string{"service_id"},
+ shardingKey: []string{},
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ measure := &databasev1.Measure{
+ Metadata: &commonv1.Metadata{Name: "test",
Group: "group"},
+ Entity: &databasev1.Entity{TagNames:
tt.entity},
+ }
+ if tt.shardingKey != nil {
+ measure.ShardingKey =
&databasev1.ShardingKey{TagNames: tt.shardingKey}
+ }
+ checkErr := CheckShardingKeySubset(measure)
+ if tt.wantErr {
+ assert.Error(t, checkErr)
+ if tt.errContains != "" {
+ assert.Contains(t, checkErr.Error(),
tt.errContains)
+ }
+ } else {
+ assert.NoError(t, checkErr)
+ }
+ })
+ }
+}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 91fa33db8..b3dff5559 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -247,6 +247,9 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata
schema.Metadata) {
sr.l.Warn().Err(err).Msg("measure is ignored")
return
}
+ if subsetWarn := validate.CheckShardingKeySubset(m); subsetWarn
!= nil {
+ sr.l.Warn().Err(subsetWarn).Str("measure",
m.GetMetadata().GetName()).Msg("sharding key is not a subset of entity tags")
+ }
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
diff --git a/banyand/metadata/schema/property/client.go
b/banyand/metadata/schema/property/client.go
index 69c2d8fe1..5984e133f 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -783,6 +783,9 @@ func (r *SchemaRegistry) CreateMeasure(ctx context.Context,
measure *databasev1.
if validateErr := validate.Measure(measure); validateErr != nil {
return 0, validateErr
}
+ if subsetWarn := validate.CheckShardingKeySubset(measure); subsetWarn
!= nil {
+ r.l.Warn().Err(subsetWarn).Str("measure",
measure.GetMetadata().GetName()).Msg("sharding key is not a subset of entity
tags")
+ }
now := time.Now().UnixNano()
measure.Metadata.ModRevision = now
measure.UpdatedAt = timestamppb.Now()
@@ -799,6 +802,9 @@ func (r *SchemaRegistry) UpdateMeasure(ctx context.Context,
measure *databasev1.
if validateErr := validate.Measure(measure); validateErr != nil {
return 0, validateErr
}
+ if subsetWarn := validate.CheckShardingKeySubset(measure); subsetWarn
!= nil {
+ r.l.Warn().Err(subsetWarn).Str("measure",
measure.GetMetadata().GetName()).Msg("sharding key is not a subset of entity
tags")
+ }
now := time.Now().UnixNano()
measure.Metadata.ModRevision = now
measure.UpdatedAt = timestamppb.Now()
diff --git
a/pkg/test/measure/testdata/measures/service_instance_cpm_minute.json
b/pkg/test/measure/testdata/measures/service_instance_cpm_minute.json
index 7c032704d..5396d89f1 100644
--- a/pkg/test/measure/testdata/measures/service_instance_cpm_minute.json
+++ b/pkg/test/measure/testdata/measures/service_instance_cpm_minute.json
@@ -37,16 +37,11 @@
}
],
"entity": {
- "tag_names": [
- "service_id",
- "entity_id"
- ]
+ "tag_names": ["service_id", "entity_id"]
},
"sharding_key": {
- "tag_names": [
- "service_id"
- ]
+ "tag_names": ["service_id", "entity_id"]
},
"interval": "1m",
"updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
+}
diff --git
a/pkg/test/measure/testdata/measures/service_instance_cpm_minute_updated.json
b/pkg/test/measure/testdata/measures/service_instance_cpm_minute_updated.json
index 1bc84febe..a4b8a54a5 100644
---
a/pkg/test/measure/testdata/measures/service_instance_cpm_minute_updated.json
+++
b/pkg/test/measure/testdata/measures/service_instance_cpm_minute_updated.json
@@ -37,16 +37,11 @@
}
],
"entity": {
- "tag_names": [
- "service_id",
- "entity_id"
- ]
+ "tag_names": ["service_id", "entity_id"]
},
"sharding_key": {
- "tag_names": [
- "service_id"
- ]
+ "tag_names": ["service_id", "entity_id"]
},
"interval": "1m",
"updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
+}
diff --git
a/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
b/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
index 47eadcee4..7fc8f74a7 100644
---
a/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
+++
b/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
@@ -41,16 +41,11 @@
}
],
"entity": {
- "tag_names": [
- "service_id",
- "entity_id"
- ]
+ "tag_names": ["service_id", "entity_id"]
},
"sharding_key": {
- "tag_names": [
- "service_id"
- ]
+ "tag_names": ["service_id", "entity_id"]
},
"interval": "1m",
"updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
+}