This is an automated email from the ASF dual-hosted git repository.
butterbright pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new fab20d0a Fix etcd prefix matching any key that starts with this prefix
(#776)
fab20d0a is described below
commit fab20d0a3ce2605973333584affe63acbf585d0f
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Sep 20 18:48:09 2025 +0800
Fix etcd prefix matching any key that starts with this prefix (#776)
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
banyand/metadata/schema/etcd.go | 2 +-
banyand/metadata/schema/etcd_test.go | 275 +++++++++++++++++++++++++++++++++
banyand/metadata/schema/prefix_test.go | 105 +++++++++++++
4 files changed, 382 insertions(+), 1 deletion(-)
diff --git a/CHANGES.md b/CHANGES.md
index d788fc11..79a44f03 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -57,6 +57,7 @@ Release Notes.
- Fix topN parsing panic when the criteria is set.
- Remove the indexed_only field in TagSpec.
- Fix returning empty result when using IN operatior on the array type tags.
+- Fix etcd prefix matching any key that starts with this prefix.
### Document
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index a6372560..a9247c6a 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -599,7 +599,7 @@ func (e *etcdSchemaRegistry) NewWatcher(name string, kind
Kind, revision int64,
}
func listPrefixesForEntity(group, entityPrefix string) string {
- return path.Join(entityPrefix, group)
+ return path.Join(entityPrefix, group) + "/"
}
func formatKey(entityPrefix string, metadata *commonv1.Metadata) string {
diff --git a/banyand/metadata/schema/etcd_test.go
b/banyand/metadata/schema/etcd_test.go
index 41b66715..bb6cdbf8 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -421,3 +421,278 @@ func Test_Etcd_Entity_Update(t *testing.T) {
})
}
}
+
+func Test_Etcd_Stream_GroupPrefixMatching(t *testing.T) {
+ tester := assert.New(t)
+ registry, closer := initServerAndRegister(t)
+ defer closer()
+
+ // Create groups with similar names to test prefix matching
+ groups := []string{"records", "recordsTrace", "recordsLog"}
+ for _, groupName := range groups {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ }
+ err := registry.CreateGroup(context.TODO(), group)
+ tester.NoError(err)
+ }
+
+ // Create streams in each group
+ streams := []struct {
+ name string
+ group string
+ }{
+ {"alarm_record", "records"},
+ {"top_n_cache_read_command", "records"},
+ {"log", "recordsLog"},
+ {"sw_span_attached_event_record", "recordsTrace"},
+ }
+
+ for _, streamData := range streams {
+ stream := &databasev1.Stream{
+ Metadata: &commonv1.Metadata{
+ Name: streamData.name,
+ Group: streamData.group,
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: "searchable",
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: "timestamp",
+ Type:
databasev1.TagType_TAG_TYPE_INT,
+ },
+ },
+ },
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"timestamp"},
+ },
+ }
+ _, err := registry.CreateStream(context.Background(), stream)
+ tester.NoError(err)
+ }
+
+ // Test that listing streams for "records" group only returns streams
from "records" group
+ recordsStreams, err := registry.ListStream(context.TODO(),
schema.ListOpt{Group: "records"})
+ tester.NoError(err)
+ tester.Equal(2, len(recordsStreams), "Should only return streams from
'records' group")
+
+ // Verify the returned streams are from the correct group
+ for _, stream := range recordsStreams {
+ tester.Equal("records", stream.Metadata.Group, "All returned
streams should be from 'records' group")
+ }
+
+ // Test that listing streams for "recordsTrace" group only returns
streams from "recordsTrace" group
+ recordsTraceStreams, err := registry.ListStream(context.TODO(),
schema.ListOpt{Group: "recordsTrace"})
+ tester.NoError(err)
+ tester.Equal(1, len(recordsTraceStreams), "Should only return streams
from 'recordsTrace' group")
+ tester.Equal("recordsTrace", recordsTraceStreams[0].Metadata.Group)
+ tester.Equal("sw_span_attached_event_record",
recordsTraceStreams[0].Metadata.Name)
+
+ // Test that listing streams for "recordsLog" group only returns
streams from "recordsLog" group
+ recordsLogStreams, err := registry.ListStream(context.TODO(),
schema.ListOpt{Group: "recordsLog"})
+ tester.NoError(err)
+ tester.Equal(1, len(recordsLogStreams), "Should only return streams
from 'recordsLog' group")
+ tester.Equal("recordsLog", recordsLogStreams[0].Metadata.Group)
+ tester.Equal("log", recordsLogStreams[0].Metadata.Name)
+}
+
+func Test_Etcd_Measure_GroupPrefixMatching(t *testing.T) {
+ tester := assert.New(t)
+ registry, closer := initServerAndRegister(t)
+ defer closer()
+
+ // Create groups with similar names to test prefix matching
+ groups := []string{"metrics", "metricsTrace", "metricsLog"}
+ for _, groupName := range groups {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ }
+ err := registry.CreateGroup(context.TODO(), group)
+ tester.NoError(err)
+ }
+
+ // Create measures in each group
+ measures := []struct {
+ name string
+ group string
+ }{
+ {"service_cpm", "metrics"},
+ {"service_resp_time", "metrics"},
+ {"trace_metrics", "metricsTrace"},
+ {"log_metrics", "metricsLog"},
+ }
+
+ for _, measureData := range measures {
+ measure := &databasev1.Measure{
+ Metadata: &commonv1.Metadata{
+ Name: measureData.name,
+ Group: measureData.group,
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: "default",
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: "service_id",
+ Type:
databasev1.TagType_TAG_TYPE_STRING,
+ },
+ },
+ },
+ },
+ Fields: []*databasev1.FieldSpec{
+ {
+ Name: "value",
+ FieldType:
databasev1.FieldType_FIELD_TYPE_INT,
+ EncodingMethod:
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+ CompressionMethod:
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+ },
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"service_id"},
+ },
+ Interval: "1m",
+ }
+ _, err := registry.CreateMeasure(context.Background(), measure)
+ tester.NoError(err)
+ }
+
+ // Test that listing measures for "metrics" group only returns measures
from "metrics" group
+ metricsResults, err := registry.ListMeasure(context.TODO(),
schema.ListOpt{Group: "metrics"})
+ tester.NoError(err)
+ tester.Equal(2, len(metricsResults), "Should only return measures from
'metrics' group")
+
+ // Verify the returned measures are from the correct group
+ for _, measure := range metricsResults {
+ tester.Equal("metrics", measure.Metadata.Group, "All returned
measures should be from 'metrics' group")
+ }
+
+ // Test that listing measures for "metricsTrace" group only returns
measures from "metricsTrace" group
+ metricsTraceResults, err := registry.ListMeasure(context.TODO(),
schema.ListOpt{Group: "metricsTrace"})
+ tester.NoError(err)
+ tester.Equal(1, len(metricsTraceResults), "Should only return measures
from 'metricsTrace' group")
+ tester.Equal("metricsTrace", metricsTraceResults[0].Metadata.Group)
+ tester.Equal("trace_metrics", metricsTraceResults[0].Metadata.Name)
+
+ // Test that listing measures for "metricsLog" group only returns
measures from "metricsLog" group
+ metricsLogResults, err := registry.ListMeasure(context.TODO(),
schema.ListOpt{Group: "metricsLog"})
+ tester.NoError(err)
+ tester.Equal(1, len(metricsLogResults), "Should only return measures
from 'metricsLog' group")
+ tester.Equal("metricsLog", metricsLogResults[0].Metadata.Group)
+ tester.Equal("log_metrics", metricsLogResults[0].Metadata.Name)
+}
+
+func Test_Etcd_GroupPrefixMatching_EdgeCases(t *testing.T) {
+ tester := assert.New(t)
+ registry, closer := initServerAndRegister(t)
+ defer closer()
+
+ // Test edge cases with various group name patterns
+ groups := []string{
+ "a",
+ "ab",
+ "abc",
+ "a_",
+ "a_b",
+ "test",
+ "testGroup",
+ "testGroupSuffix",
+ }
+
+ for _, groupName := range groups {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ }
+ err := registry.CreateGroup(context.TODO(), group)
+ tester.NoError(err)
+
+ // Create one stream per group
+ stream := &databasev1.Stream{
+ Metadata: &commonv1.Metadata{
+ Name: "stream_" + groupName,
+ Group: groupName,
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: "default",
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: "id",
+ Type:
databasev1.TagType_TAG_TYPE_STRING,
+ },
+ },
+ },
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"id"},
+ },
+ }
+ _, err = registry.CreateStream(context.Background(), stream)
+ tester.NoError(err)
+ }
+
+ // Test each group individually to ensure exact matching
+ for _, groupName := range groups {
+ streams, err := registry.ListStream(context.TODO(),
schema.ListOpt{Group: groupName})
+ tester.NoError(err)
+ tester.Equal(1, len(streams), "Group '%s' should have exactly 1
stream", groupName)
+ tester.Equal(groupName, streams[0].Metadata.Group, "Stream
should belong to group '%s'", groupName)
+ tester.Equal("stream_"+groupName, streams[0].Metadata.Name,
"Stream name should match expected pattern")
+ }
+
+ // Specifically test the problematic case from the GitHub issue
+ // Ensure "a" group doesn't return streams from "ab" or "abc" groups
+ aStreams, err := registry.ListStream(context.TODO(),
schema.ListOpt{Group: "a"})
+ tester.NoError(err)
+ tester.Equal(1, len(aStreams), "Group 'a' should have exactly 1 stream")
+ tester.Equal("a", aStreams[0].Metadata.Group)
+
+ // Ensure "test" group doesn't return streams from "testGroup" or
"testGroupSuffix" groups
+ testStreams, err := registry.ListStream(context.TODO(),
schema.ListOpt{Group: "test"})
+ tester.NoError(err)
+ tester.Equal(1, len(testStreams), "Group 'test' should have exactly 1
stream")
+ tester.Equal("test", testStreams[0].Metadata.Group)
+}
diff --git a/banyand/metadata/schema/prefix_test.go
b/banyand/metadata/schema/prefix_test.go
new file mode 100644
index 00000000..f2c5cd73
--- /dev/null
+++ b/banyand/metadata/schema/prefix_test.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_listPrefixesForEntity(t *testing.T) {
+ tests := []struct {
+ name string
+ group string
+ entityPrefix string
+ expected string
+ }{
+ {
+ name: "records group with streams prefix",
+ group: "records",
+ entityPrefix: "/streams/",
+ expected: "/streams/records/",
+ },
+ {
+ name: "recordsTrace group with streams prefix",
+ group: "recordsTrace",
+ entityPrefix: "/streams/",
+ expected: "/streams/recordsTrace/",
+ },
+ {
+ name: "recordsLog group with streams prefix",
+ group: "recordsLog",
+ entityPrefix: "/streams/",
+ expected: "/streams/recordsLog/",
+ },
+ {
+ name: "metrics group with measures prefix",
+ group: "metrics",
+ entityPrefix: "/measures/",
+ expected: "/measures/metrics/",
+ },
+ {
+ name: "single character group",
+ group: "a",
+ entityPrefix: "/streams/",
+ expected: "/streams/a/",
+ },
+ {
+ name: "empty group",
+ group: "",
+ entityPrefix: "/streams/",
+ expected: "/streams/",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := listPrefixesForEntity(tt.group,
tt.entityPrefix)
+ assert.Equal(t, tt.expected, result,
"listPrefixesForEntity(%q, %q) should return %q", tt.group, tt.entityPrefix,
tt.expected)
+ })
+ }
+}
+
+func Test_listPrefixesForEntity_FixValidation(t *testing.T) {
+ // This test validates that the fix prevents the original GitHub issue
+ // where group "records" would match groups "recordsTrace" and
"recordsLog"
+
+ recordsPrefix := listPrefixesForEntity("records", "/streams/")
+ recordsTracePrefix := listPrefixesForEntity("recordsTrace", "/streams/")
+ recordsLogPrefix := listPrefixesForEntity("recordsLog", "/streams/")
+
+ // After the fix, each prefix should be unique and not be a prefix of
another
+ assert.Equal(t, "/streams/records/", recordsPrefix)
+ assert.Equal(t, "/streams/recordsTrace/", recordsTracePrefix)
+ assert.Equal(t, "/streams/recordsLog/", recordsLogPrefix)
+
+ // Verify that "records" prefix doesn't match the other two
+ // (This would fail before the fix)
+ assert.False(t, hasPrefix(recordsTracePrefix, recordsPrefix),
"recordsTrace prefix should not start with records prefix")
+ assert.False(t, hasPrefix(recordsLogPrefix, recordsPrefix), "recordsLog
prefix should not start with records prefix")
+
+ // Verify that they are all distinct
+ assert.NotEqual(t, recordsPrefix, recordsTracePrefix)
+ assert.NotEqual(t, recordsPrefix, recordsLogPrefix)
+ assert.NotEqual(t, recordsTracePrefix, recordsLogPrefix)
+}
+
+func hasPrefix(s, prefix string) bool {
+ return len(s) >= len(prefix) && s[:len(prefix)] == prefix
+}