This is an automated email from the ASF dual-hosted git repository.

butterbright pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new fab20d0a Fix etcd prefix matching any key that starts with this prefix 
(#776)
fab20d0a is described below

commit fab20d0a3ce2605973333584affe63acbf585d0f
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Sep 20 18:48:09 2025 +0800

    Fix etcd prefix matching any key that starts with this prefix (#776)
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                             |   1 +
 banyand/metadata/schema/etcd.go        |   2 +-
 banyand/metadata/schema/etcd_test.go   | 275 +++++++++++++++++++++++++++++++++
 banyand/metadata/schema/prefix_test.go | 105 +++++++++++++
 4 files changed, 382 insertions(+), 1 deletion(-)

diff --git a/CHANGES.md b/CHANGES.md
index d788fc11..79a44f03 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -57,6 +57,7 @@ Release Notes.
 - Fix topN parsing panic when the criteria is set.
 - Remove the indexed_only field in TagSpec.
 - Fix returning empty result when using IN operatior on the array type tags.
+- Fix etcd prefix matching any key that starts with this prefix.
 
 ### Document
 
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index a6372560..a9247c6a 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -599,7 +599,7 @@ func (e *etcdSchemaRegistry) NewWatcher(name string, kind 
Kind, revision int64,
 }
 
 func listPrefixesForEntity(group, entityPrefix string) string {
-       return path.Join(entityPrefix, group)
+       return path.Join(entityPrefix, group) + "/"
 }
 
 func formatKey(entityPrefix string, metadata *commonv1.Metadata) string {
diff --git a/banyand/metadata/schema/etcd_test.go 
b/banyand/metadata/schema/etcd_test.go
index 41b66715..bb6cdbf8 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -421,3 +421,278 @@ func Test_Etcd_Entity_Update(t *testing.T) {
                })
        }
 }
+
+func Test_Etcd_Stream_GroupPrefixMatching(t *testing.T) {
+       tester := assert.New(t)
+       registry, closer := initServerAndRegister(t)
+       defer closer()
+
+       // Create groups with similar names to test prefix matching
+       groups := []string{"records", "recordsTrace", "recordsLog"}
+       for _, groupName := range groups {
+               group := &commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: groupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_STREAM,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 2,
+                               SegmentInterval: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                                       Num:  1,
+                               },
+                               Ttl: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                                       Num:  7,
+                               },
+                       },
+               }
+               err := registry.CreateGroup(context.TODO(), group)
+               tester.NoError(err)
+       }
+
+       // Create streams in each group
+       streams := []struct {
+               name  string
+               group string
+       }{
+               {"alarm_record", "records"},
+               {"top_n_cache_read_command", "records"},
+               {"log", "recordsLog"},
+               {"sw_span_attached_event_record", "recordsTrace"},
+       }
+
+       for _, streamData := range streams {
+               stream := &databasev1.Stream{
+                       Metadata: &commonv1.Metadata{
+                               Name:  streamData.name,
+                               Group: streamData.group,
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "searchable",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "timestamp",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_INT,
+                                               },
+                                       },
+                               },
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"timestamp"},
+                       },
+               }
+               _, err := registry.CreateStream(context.Background(), stream)
+               tester.NoError(err)
+       }
+
+       // Test that listing streams for "records" group only returns streams 
from "records" group
+       recordsStreams, err := registry.ListStream(context.TODO(), 
schema.ListOpt{Group: "records"})
+       tester.NoError(err)
+       tester.Equal(2, len(recordsStreams), "Should only return streams from 
'records' group")
+
+       // Verify the returned streams are from the correct group
+       for _, stream := range recordsStreams {
+               tester.Equal("records", stream.Metadata.Group, "All returned 
streams should be from 'records' group")
+       }
+
+       // Test that listing streams for "recordsTrace" group only returns 
streams from "recordsTrace" group
+       recordsTraceStreams, err := registry.ListStream(context.TODO(), 
schema.ListOpt{Group: "recordsTrace"})
+       tester.NoError(err)
+       tester.Equal(1, len(recordsTraceStreams), "Should only return streams 
from 'recordsTrace' group")
+       tester.Equal("recordsTrace", recordsTraceStreams[0].Metadata.Group)
+       tester.Equal("sw_span_attached_event_record", 
recordsTraceStreams[0].Metadata.Name)
+
+       // Test that listing streams for "recordsLog" group only returns 
streams from "recordsLog" group
+       recordsLogStreams, err := registry.ListStream(context.TODO(), 
schema.ListOpt{Group: "recordsLog"})
+       tester.NoError(err)
+       tester.Equal(1, len(recordsLogStreams), "Should only return streams 
from 'recordsLog' group")
+       tester.Equal("recordsLog", recordsLogStreams[0].Metadata.Group)
+       tester.Equal("log", recordsLogStreams[0].Metadata.Name)
+}
+
+func Test_Etcd_Measure_GroupPrefixMatching(t *testing.T) {
+       tester := assert.New(t)
+       registry, closer := initServerAndRegister(t)
+       defer closer()
+
+       // Create groups with similar names to test prefix matching
+       groups := []string{"metrics", "metricsTrace", "metricsLog"}
+       for _, groupName := range groups {
+               group := &commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: groupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_MEASURE,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 2,
+                               SegmentInterval: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                                       Num:  1,
+                               },
+                               Ttl: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                                       Num:  7,
+                               },
+                       },
+               }
+               err := registry.CreateGroup(context.TODO(), group)
+               tester.NoError(err)
+       }
+
+       // Create measures in each group
+       measures := []struct {
+               name  string
+               group string
+       }{
+               {"service_cpm", "metrics"},
+               {"service_resp_time", "metrics"},
+               {"trace_metrics", "metricsTrace"},
+               {"log_metrics", "metricsLog"},
+       }
+
+       for _, measureData := range measures {
+               measure := &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{
+                               Name:  measureData.name,
+                               Group: measureData.group,
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "default",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "service_id",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                       },
+                               },
+                       },
+                       Fields: []*databasev1.FieldSpec{
+                               {
+                                       Name:              "value",
+                                       FieldType:         
databasev1.FieldType_FIELD_TYPE_INT,
+                                       EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+                                       CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+                               },
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"service_id"},
+                       },
+                       Interval: "1m",
+               }
+               _, err := registry.CreateMeasure(context.Background(), measure)
+               tester.NoError(err)
+       }
+
+       // Test that listing measures for "metrics" group only returns measures 
from "metrics" group
+       metricsResults, err := registry.ListMeasure(context.TODO(), 
schema.ListOpt{Group: "metrics"})
+       tester.NoError(err)
+       tester.Equal(2, len(metricsResults), "Should only return measures from 
'metrics' group")
+
+       // Verify the returned measures are from the correct group
+       for _, measure := range metricsResults {
+               tester.Equal("metrics", measure.Metadata.Group, "All returned 
measures should be from 'metrics' group")
+       }
+
+       // Test that listing measures for "metricsTrace" group only returns 
measures from "metricsTrace" group
+       metricsTraceResults, err := registry.ListMeasure(context.TODO(), 
schema.ListOpt{Group: "metricsTrace"})
+       tester.NoError(err)
+       tester.Equal(1, len(metricsTraceResults), "Should only return measures 
from 'metricsTrace' group")
+       tester.Equal("metricsTrace", metricsTraceResults[0].Metadata.Group)
+       tester.Equal("trace_metrics", metricsTraceResults[0].Metadata.Name)
+
+       // Test that listing measures for "metricsLog" group only returns 
measures from "metricsLog" group
+       metricsLogResults, err := registry.ListMeasure(context.TODO(), 
schema.ListOpt{Group: "metricsLog"})
+       tester.NoError(err)
+       tester.Equal(1, len(metricsLogResults), "Should only return measures 
from 'metricsLog' group")
+       tester.Equal("metricsLog", metricsLogResults[0].Metadata.Group)
+       tester.Equal("log_metrics", metricsLogResults[0].Metadata.Name)
+}
+
+func Test_Etcd_GroupPrefixMatching_EdgeCases(t *testing.T) {
+       tester := assert.New(t)
+       registry, closer := initServerAndRegister(t)
+       defer closer()
+
+       // Test edge cases with various group name patterns
+       groups := []string{
+               "a",
+               "ab",
+               "abc",
+               "a_",
+               "a_b",
+               "test",
+               "testGroup",
+               "testGroupSuffix",
+       }
+
+       for _, groupName := range groups {
+               group := &commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: groupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_STREAM,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 2,
+                               SegmentInterval: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                                       Num:  1,
+                               },
+                               Ttl: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                                       Num:  7,
+                               },
+                       },
+               }
+               err := registry.CreateGroup(context.TODO(), group)
+               tester.NoError(err)
+
+               // Create one stream per group
+               stream := &databasev1.Stream{
+                       Metadata: &commonv1.Metadata{
+                               Name:  "stream_" + groupName,
+                               Group: groupName,
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "default",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "id",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                       },
+                               },
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"id"},
+                       },
+               }
+               _, err = registry.CreateStream(context.Background(), stream)
+               tester.NoError(err)
+       }
+
+       // Test each group individually to ensure exact matching
+       for _, groupName := range groups {
+               streams, err := registry.ListStream(context.TODO(), 
schema.ListOpt{Group: groupName})
+               tester.NoError(err)
+               tester.Equal(1, len(streams), "Group '%s' should have exactly 1 
stream", groupName)
+               tester.Equal(groupName, streams[0].Metadata.Group, "Stream 
should belong to group '%s'", groupName)
+               tester.Equal("stream_"+groupName, streams[0].Metadata.Name, 
"Stream name should match expected pattern")
+       }
+
+       // Specifically test the problematic case from the GitHub issue
+       // Ensure "a" group doesn't return streams from "ab" or "abc" groups
+       aStreams, err := registry.ListStream(context.TODO(), 
schema.ListOpt{Group: "a"})
+       tester.NoError(err)
+       tester.Equal(1, len(aStreams), "Group 'a' should have exactly 1 stream")
+       tester.Equal("a", aStreams[0].Metadata.Group)
+
+       // Ensure "test" group doesn't return streams from "testGroup" or 
"testGroupSuffix" groups
+       testStreams, err := registry.ListStream(context.TODO(), 
schema.ListOpt{Group: "test"})
+       tester.NoError(err)
+       tester.Equal(1, len(testStreams), "Group 'test' should have exactly 1 
stream")
+       tester.Equal("test", testStreams[0].Metadata.Group)
+}
diff --git a/banyand/metadata/schema/prefix_test.go 
b/banyand/metadata/schema/prefix_test.go
new file mode 100644
index 00000000..f2c5cd73
--- /dev/null
+++ b/banyand/metadata/schema/prefix_test.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func Test_listPrefixesForEntity(t *testing.T) {
+       tests := []struct {
+               name         string
+               group        string
+               entityPrefix string
+               expected     string
+       }{
+               {
+                       name:         "records group with streams prefix",
+                       group:        "records",
+                       entityPrefix: "/streams/",
+                       expected:     "/streams/records/",
+               },
+               {
+                       name:         "recordsTrace group with streams prefix",
+                       group:        "recordsTrace",
+                       entityPrefix: "/streams/",
+                       expected:     "/streams/recordsTrace/",
+               },
+               {
+                       name:         "recordsLog group with streams prefix",
+                       group:        "recordsLog",
+                       entityPrefix: "/streams/",
+                       expected:     "/streams/recordsLog/",
+               },
+               {
+                       name:         "metrics group with measures prefix",
+                       group:        "metrics",
+                       entityPrefix: "/measures/",
+                       expected:     "/measures/metrics/",
+               },
+               {
+                       name:         "single character group",
+                       group:        "a",
+                       entityPrefix: "/streams/",
+                       expected:     "/streams/a/",
+               },
+               {
+                       name:         "empty group",
+                       group:        "",
+                       entityPrefix: "/streams/",
+                       expected:     "/streams/",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := listPrefixesForEntity(tt.group, 
tt.entityPrefix)
+                       assert.Equal(t, tt.expected, result, 
"listPrefixesForEntity(%q, %q) should return %q", tt.group, tt.entityPrefix, 
tt.expected)
+               })
+       }
+}
+
+func Test_listPrefixesForEntity_FixValidation(t *testing.T) {
+       // This test validates that the fix prevents the original GitHub issue
+       // where group "records" would match groups "recordsTrace" and 
"recordsLog"
+
+       recordsPrefix := listPrefixesForEntity("records", "/streams/")
+       recordsTracePrefix := listPrefixesForEntity("recordsTrace", "/streams/")
+       recordsLogPrefix := listPrefixesForEntity("recordsLog", "/streams/")
+
+       // After the fix, each prefix should be unique and not be a prefix of 
another
+       assert.Equal(t, "/streams/records/", recordsPrefix)
+       assert.Equal(t, "/streams/recordsTrace/", recordsTracePrefix)
+       assert.Equal(t, "/streams/recordsLog/", recordsLogPrefix)
+
+       // Verify that "records" prefix doesn't match the other two
+       // (This would fail before the fix)
+       assert.False(t, hasPrefix(recordsTracePrefix, recordsPrefix), 
"recordsTrace prefix should not start with records prefix")
+       assert.False(t, hasPrefix(recordsLogPrefix, recordsPrefix), "recordsLog 
prefix should not start with records prefix")
+
+       // Verify that they are all distinct
+       assert.NotEqual(t, recordsPrefix, recordsTracePrefix)
+       assert.NotEqual(t, recordsPrefix, recordsLogPrefix)
+       assert.NotEqual(t, recordsTracePrefix, recordsLogPrefix)
+}
+
+func hasPrefix(s, prefix string) bool {
+       return len(s) >= len(prefix) && s[:len(prefix)] == prefix
+}

Reply via email to