Copilot commented on code in PR #873:
URL: 
https://github.com/apache/skywalking-banyandb/pull/873#discussion_r2617666218


##########
banyand/measure/topn.go:
##########
@@ -604,19 +607,21 @@ func (manager *topNProcessorManager) 
buildMapper(fieldName string, groupByNames
 type groupTagsLocator []partition.TagLocator
 
 // newGroupLocator generates a groupTagsLocator which strictly preserve the 
order of groupByNames.
-func newGroupLocator(m *databasev1.Measure, groupByNames []string) 
(groupTagsLocator, error) {
+func newGroupLocator(m *databasev1.Measure, groupByNames []string) 
(groupTagsLocator, []string) {
        groupTags := make([]partition.TagLocator, 0, len(groupByNames))
+       var removedTags []string
        for _, groupByName := range groupByNames {
                fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(), 
groupByName)
                if spec == nil {
-                       return nil, fmt.Errorf("tag %s is not found", 
groupByName)
+                       removedTags = append(removedTags, groupByName)
+                       continue
                }
                groupTags = append(groupTags, partition.TagLocator{
                        FamilyOffset: fIdx,
                        TagOffset:    tIdx,
                })
        }
-       return groupTags, nil
+       return groupTags, removedTags

Review Comment:
   The function `newGroupLocator` has inconsistent error handling compared to 
similar functions in the codebase. While it now returns removed tags as a 
string slice instead of an error, the function signature change does not have 
proper documentation explaining this behavior. The caller in `buildMapper` logs 
warnings for removed tags but continues processing, which could lead to 
incorrect TopN aggregation results if critical groupBy tags are missing. 
Consider returning an error if any groupBy tag is not found, as groupBy tags 
are essential for the aggregation to work correctly.



##########
banyand/trace/query.go:
##########
@@ -134,6 +147,41 @@ func validateTraceQueryOptions(tqo 
model.TraceQueryOptions) error {
        return nil
 }
 
+func (t *trace) filterTagProjection(tagProjection *model.TagProjection) 
*model.TagProjection {
+       if tagProjection == nil || len(tagProjection.Names) == 0 {
+               return tagProjection
+       }
+
+       is := t.indexSchema.Load()
+       if is == nil {
+               return tagProjection
+       }
+       tagMap := is.(indexSchema).tagMap
+       if len(tagMap) == 0 {
+               return tagProjection
+       }
+
+       filteredNames := make([]string, 0, len(tagProjection.Names))
+       for _, name := range tagProjection.Names {
+               if _, exists := tagMap[name]; exists {
+                       filteredNames = append(filteredNames, name)
+               }
+       }
+
+       if len(filteredNames) == len(tagProjection.Names) {
+               return tagProjection
+       }
+
+       if len(filteredNames) == 0 {
+               return nil
+       }
+
+       return &model.TagProjection{
+               Family: tagProjection.Family,
+               Names:  filteredNames,
+       }
+}

Review Comment:
   The function `filterTagProjection` has a potential concurrency issue. The 
`indexSchema` is loaded atomically, but once loaded, the `tagMap` field is 
accessed directly without synchronization. If the schema is updated while this 
function is executing, there could be a race condition. While atomic.Value 
protects against reading partially-written pointers, the map itself could be 
modified concurrently. Consider documenting that the indexSchema should be 
immutable once stored, or implement copy-on-write semantics for schema updates.



##########
banyand/metadata/schema/stream.go:
##########
@@ -90,26 +90,45 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx 
context.Context, stream *databasev
        })
 }
 
-func validateEqualExceptAppendTags(prevStream, newStream *databasev1.Stream) 
error {
+func validateStreamUpdate(prevStream, newStream *databasev1.Stream) error {
        if prevStream.GetEntity().String() != newStream.GetEntity().String() {
                return fmt.Errorf("entity is different: %s != %s", 
prevStream.GetEntity().String(), newStream.GetEntity().String())
        }
-       if len(prevStream.GetTagFamilies()) > len(newStream.GetTagFamilies()) {
-               return fmt.Errorf("number of tag families is less in the new 
stream")
-       }
-       for i, tagFamily := range prevStream.GetTagFamilies() {
-               if tagFamily.Name != newStream.GetTagFamilies()[i].Name {
-                       return fmt.Errorf("tag family name is different: %s != 
%s", tagFamily.Name, newStream.GetTagFamilies()[i].Name)
+
+       entityTagSet := make(map[string]struct{})
+       for _, tagName := range newStream.GetEntity().GetTagNames() {
+               entityTagSet[tagName] = struct{}{}
+       }
+       newTagFamilyMap := make(map[string]map[string]*databasev1.TagSpec)
+       for _, tf := range newStream.GetTagFamilies() {
+               tagMap := make(map[string]*databasev1.TagSpec)
+               for _, tag := range tf.GetTags() {
+                       tagMap[tag.GetName()] = tag
                }
-               if len(tagFamily.Tags) > 
len(newStream.GetTagFamilies()[i].Tags) {
-                       return fmt.Errorf("number of tags in tag family %s is 
less in the new stream", tagFamily.Name)
+               newTagFamilyMap[tf.GetName()] = tagMap
+       }
+
+       for _, prevTagFamily := range prevStream.GetTagFamilies() {
+               newTagMap, familyExists := 
newTagFamilyMap[prevTagFamily.GetName()]
+               if !familyExists {
+                       for _, tag := range prevTagFamily.GetTags() {
+                               if _, isEntity := entityTagSet[tag.GetName()]; 
isEntity {
+                                       return fmt.Errorf("cannot delete tag 
family %s: it contains entity tag %s", prevTagFamily.GetName(), tag.GetName())
+                               }
+                       }
+                       continue
                }

Review Comment:
   The validation logic for stream updates allows deleting entire tag families 
if they don't contain entity tags. However, there is no validation to ensure 
that after deletion, the stream still has at least one tag family or that 
required tag families (like "data") are preserved. This could potentially 
result in streams with no queryable data after schema updates.



##########
banyand/metadata/schema/measure.go:
##########
@@ -139,30 +139,56 @@ func validateEqualExceptAppendTagsAndFields(prevMeasure, 
newMeasure *databasev1.
        if prevMeasure.GetIndexMode() != newMeasure.GetIndexMode() {
                return fmt.Errorf("index mode is different: %v != %v", 
prevMeasure.GetIndexMode(), newMeasure.GetIndexMode())
        }
-       if len(prevMeasure.GetTagFamilies()) > len(newMeasure.GetTagFamilies()) 
{
-               return fmt.Errorf("number of tag families is less in the new 
measure")
-       }
-       if len(prevMeasure.GetFields()) > len(newMeasure.GetFields()) {
-               return fmt.Errorf("number of fields is less in the new measure")
-       }
-       for i, tagFamily := range prevMeasure.GetTagFamilies() {
-               if tagFamily.Name != newMeasure.GetTagFamilies()[i].Name {
-                       return fmt.Errorf("tag family name is different: %s != 
%s", tagFamily.Name, newMeasure.GetTagFamilies()[i].Name)
+
+       entityTagSet := make(map[string]struct{})
+       for _, tagName := range newMeasure.GetEntity().GetTagNames() {
+               entityTagSet[tagName] = struct{}{}
+       }
+       newTagFamilyMap := make(map[string]map[string]*databasev1.TagSpec)
+       for _, tf := range newMeasure.GetTagFamilies() {
+               tagMap := make(map[string]*databasev1.TagSpec)
+               for _, tag := range tf.GetTags() {
+                       tagMap[tag.GetName()] = tag
                }
-               if len(tagFamily.Tags) > 
len(newMeasure.GetTagFamilies()[i].Tags) {
-                       return fmt.Errorf("number of tags in tag family %s is 
less in the new measure", tagFamily.Name)
+               newTagFamilyMap[tf.GetName()] = tagMap
+       }
+
+       for _, prevTagFamily := range prevMeasure.GetTagFamilies() {
+               newTagMap, familyExists := 
newTagFamilyMap[prevTagFamily.GetName()]
+               if !familyExists {
+                       for _, tag := range prevTagFamily.GetTags() {
+                               if _, isEntity := entityTagSet[tag.GetName()]; 
isEntity {
+                                       return fmt.Errorf("cannot delete tag 
family %s: it contains entity tag %s", prevTagFamily.GetName(), tag.GetName())
+                               }
+                       }
+                       continue
                }

Review Comment:
   The validation logic for measure updates allows deleting entire tag families 
if they don't contain entity tags. Similar to streams, there is no validation 
to ensure that after deletion, the measure still has at least one tag family. 
This could potentially result in measures with incomplete schema after updates.



##########
banyand/stream/block.go:
##########
@@ -526,8 +534,13 @@ func (bc *blockCursor) copyTo(r *model.StreamResult) {
                        logger.Panicf("unexpected number of tags: got %d; want 
%d", len(r.TagFamilies[i].Tags), len(bc.tagProjection[i].Names))
                }
                for i2, c := range cf.tags {
+                       schemaType, hasSchemaType := bc.schemaTagTypes[c.name]
                        if len(c.values) > bc.idx {
-                               r.TagFamilies[i].Tags[i2].Values = 
append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType, 
c.values[bc.idx]))
+                               if !hasSchemaType || c.valueType == schemaType {
+                                       r.TagFamilies[i].Tags[i2].Values = 
append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType, 
c.values[bc.idx]))
+                               } else {
+                                       r.TagFamilies[i].Tags[i2].Values = 
append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue)
+                               }
                        } else {
                                r.TagFamilies[i].Tags[i2].Values = 
append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue)
                        }

Review Comment:
   The loop variable name `i2` is unclear and doesn't follow Go naming 
conventions. Consider renaming it to `tagIdx` or `j` for better readability, as 
it represents the index of a tag within a tag family.



##########
pkg/pb/v1/value.go:
##########
@@ -89,6 +89,26 @@ func MustTagValueSpecToValueType(tag databasev1.TagType) 
ValueType {
        }
 }
 
+// TagValueSpecToValueType converts databasev1.TagType to ValueType.
+func TagValueSpecToValueType(tag databasev1.TagType) ValueType {
+       switch tag {
+       case databasev1.TagType_TAG_TYPE_STRING:
+               return ValueTypeStr
+       case databasev1.TagType_TAG_TYPE_INT:
+               return ValueTypeInt64
+       case databasev1.TagType_TAG_TYPE_DATA_BINARY:
+               return ValueTypeBinaryData
+       case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
+               return ValueTypeStrArr
+       case databasev1.TagType_TAG_TYPE_INT_ARRAY:
+               return ValueTypeInt64Arr
+       case databasev1.TagType_TAG_TYPE_TIMESTAMP:
+               return ValueTypeTimestamp
+       default:
+               return ValueTypeUnknown
+       }
+}

Review Comment:
   The function `TagValueSpecToValueType` is duplicated functionality with 
`MustTagValueSpecToValueType`. The only difference is that this function 
returns `ValueTypeUnknown` on unknown types while `MustTagValueSpecToValueType` 
panics. Consider adding a comment explaining when to use this non-panicking 
version versus the panicking version, or consolidate them into a single 
function with a boolean return value indicating success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to