hanahmily commented on code in PR #1126:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1126#discussion_r3223659090


##########
banyand/measure/block.go:
##########
@@ -956,6 +968,10 @@ func fastTagAppend(bi, b *blockPointer, offset int) error {
                                return fmt.Errorf("unexpected tag name for tag 
family %q: got %q; want %q",
                                        bi.tagFamilies[i].name, 
b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name)
                        }
+               }
+       }
+       for i := range bi.tagFamilies {

Review Comment:
   The original `fastTagAppend` performed the name check and the append inside 
the same loop. The change splits it into two full passes over the same tag 
families × columns with no functional reason — renaming happens on the source 
block before this function is called, so column names are stable here. The 
split is extra work and creates two places that must stay in sync. Mirror in 
`banyand/stream/block.go`.



##########
banyand/measure/merger.go:
##########
@@ -359,6 +366,101 @@ func mergeBlocks(closeCh <-chan struct{}, bw 
*blockWriter, br *blockReader) (*pa
        return &result, nil
 }
 
+func renameConflictColumns(b *block, conflictColumns 
map[string]map[string]struct{}) {
+       if len(conflictColumns) == 0 {
+               return
+       }
+       for i := range b.tagFamilies {
+               columns := conflictColumns[b.tagFamilies[i].name]
+               if columns == nil {
+                       continue
+               }
+               cc := b.tagFamilies[i].columns
+               for j := range cc {
+                       if _, ok := columns[cc[j].name]; ok {
+                               cc[j].name = encodeTypedColumn(cc[j].name, 
cc[j].valueType)
+                       }
+               }
+       }
+}
+
+func collectConflictColumns(parts []*partWrapper) 
map[string]map[string]struct{} {
+       familyColumnTypes := 
make(map[string]map[string]map[pbv1.ValueType]struct{})
+       for _, pw := range parts {
+               partTypes := readPartColumnTypes(pw.p)
+               for cf, cc := range partTypes {
+                       columnTypes := familyColumnTypes[cf]
+                       if columnTypes == nil {
+                               columnTypes = 
make(map[string]map[pbv1.ValueType]struct{})
+                               familyColumnTypes[cf] = columnTypes
+                       }
+                       for name, vt := range cc {
+                               decoded := decodeTypedColumn(name)
+                               valueTypes := columnTypes[decoded]
+                               if valueTypes == nil {
+                                       valueTypes = 
make(map[pbv1.ValueType]struct{})
+                                       columnTypes[decoded] = valueTypes
+                               }
+                               valueTypes[vt] = struct{}{}
+                       }
+               }
+       }
+       var conflictColumns map[string]map[string]struct{}
+       for cf, columnTypes := range familyColumnTypes {
+               for name, valueTypes := range columnTypes {
+                       if len(valueTypes) <= 1 {
+                               continue
+                       }
+                       if conflictColumns == nil {
+                               conflictColumns = 
make(map[string]map[string]struct{})
+                       }
+                       if conflictColumns[cf] == nil {
+                               conflictColumns[cf] = make(map[string]struct{})
+                       }
+                       conflictColumns[cf][name] = struct{}{}
+               }
+       }
+       return conflictColumns
+}
+
+func readPartColumnTypes(p *part) map[string]map[string]pbv1.ValueType {
+       if len(p.tagFamilyMetadata) == 0 || len(p.primaryBlockMetadata) == 0 {
+               return nil
+       }
+       pmi := generatePartMergeIter()
+       defer releasePartMergeIter(pmi)
+       pmi.mustInitFromPart(p)
+
+       result := make(map[string]map[string]pbv1.ValueType)
+       bb := bigValuePool.Generate()
+       defer bigValuePool.Release(bb)
+       for pmi.nextBlockMetadata() {
+               for cf, db := range pmi.block.bm.tagFamilies {
+                       reader, ok := p.tagFamilyMetadata[cf]
+                       if !ok {
+                               continue
+                       }
+                       bb.Buf = bytes.ResizeExact(bb.Buf, int(db.size))
+                       fs.MustReadData(reader, int64(db.offset), bb.Buf)
+                       cfm := generateColumnFamilyMetadata()
+                       if _, err := cfm.unmarshal(bb.Buf); err != nil {

Review Comment:
   Unmarshal failure is silently swallowed. If a part's column-family metadata 
block is corrupted, conflict detection skips the column, the merger doesn't 
rename it, and the resulting merged part ends up with the exact duplicate-typed 
columns this change is meant to prevent — without any signal that something 
went wrong. At minimum this should log a warning. Same in 
`banyand/stream/merger.go`.



##########
banyand/measure/column.go:
##########
@@ -27,6 +29,57 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
+const typedColumnSeparator = "#"

Review Comment:
   This introduces `#` as a reserved separator in on-disk column names. Please 
confirm that the schema validator forbids `#` in user-provided tag names. If a 
tag named `foo#int` is allowed, `decodeTypedColumn` will silently strip the 
suffix to `foo`, which then causes `collectConflictColumns` to manufacture a 
phantom conflict and the merger to rename a non-conflicting column. Same 
applies to `banyand/stream/tag.go`.



##########
banyand/measure/column.go:
##########
@@ -27,6 +29,57 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
+const typedColumnSeparator = "#"
+
+var (
+       valueTypeToSuffix = map[pbv1.ValueType]string{
+               pbv1.ValueTypeStr:        "str",
+               pbv1.ValueTypeInt64:      "int",
+               pbv1.ValueTypeFloat64:    "float",
+               pbv1.ValueTypeBinaryData: "bin",
+               pbv1.ValueTypeStrArr:     "str_arr",
+               pbv1.ValueTypeInt64Arr:   "int_arr",
+               pbv1.ValueTypeTimestamp:  "ts",
+               pbv1.ValueTypeUnknown:    "",
+       }
+       suffixToValueType = map[string]pbv1.ValueType{
+               "str":     pbv1.ValueTypeStr,
+               "int":     pbv1.ValueTypeInt64,
+               "float":   pbv1.ValueTypeFloat64,
+               "bin":     pbv1.ValueTypeBinaryData,
+               "str_arr": pbv1.ValueTypeStrArr,
+               "int_arr": pbv1.ValueTypeInt64Arr,
+               "ts":      pbv1.ValueTypeTimestamp,
+       }
+)
+
+func encodeTypedColumn(name string, vt pbv1.ValueType) string {
+       suffix, ok := valueTypeToSuffix[vt]
+       if !ok || suffix == "" {
+               return name
+       }
+       return name + typedColumnSeparator + suffix
+}
+
+func decodeTypedColumn(key string) string {
+       for suffix := range suffixToValueType {
+               sepSuffix := typedColumnSeparator + suffix
+               if strings.HasSuffix(key, sepSuffix) {
+                       return key[:len(key)-len(sepSuffix)]
+               }
+       }
+       return key
+}
+
+func hasTypeSuffix(key string) bool {

Review Comment:
   `hasTypeSuffix` (and its unit test) is added here but is never called from 
`banyand/measure` — same for the stream counterpart in `banyand/stream/tag.go`. 
The only callers live in `banyand/trace` and `banyand/internal/sidx`. Either 
remove the dead copies from measure/stream or wire them into the reader the way 
trace does.



##########
banyand/measure/block.go:
##########
@@ -225,13 +226,24 @@ func (b *block) unmarshalTagFamily(decoder 
*encoding.BytesBlockDecoder, tfIndex
        cc := b.tagFamilies[tfIndex].resizeColumns(len(tagProjection))
 NEXT:
        for j := range tagProjection {
+               tp := tagProjection[j]
+               if schemaType, ok := schemaTagTypes[tp]; ok {
+                       typedName := encodeTypedColumn(tp, schemaType)
+                       for i := range cfm.columnMetadata {
+                               if cfm.columnMetadata[i].name == typedName {
+                                       cc[j].mustReadValues(decoder, 
valueReader, cfm.columnMetadata[i], uint64(b.Len()))
+                                       cc[j].name = tp
+                                       continue NEXT
+                               }
+                       }
+               }
                for i := range cfm.columnMetadata {

Review Comment:
   After the typed-name lookup misses, the fallback matches on the plain name 
without checking that the on-disk `valueType` agrees with the schema type. The 
four downstream output sites in this file currently filter out wrong-typed 
columns with `c.valueType == schemaType` guards, so the user-visible result is 
correct today — but the contract here is fragile: every future read site that 
consumes `cf.columns[i].values` must remember the rule. Compare 
`banyand/trace/block.go` where the projection planner gates the match on 
`bm.tagType[typedTag] == schemaType` once at read time, so wrong-typed columns 
simply don't appear in the cursor. Mirror in `banyand/stream/block.go`.



##########
banyand/measure/merger.go:
##########
@@ -359,6 +366,101 @@ func mergeBlocks(closeCh <-chan struct{}, bw 
*blockWriter, br *blockReader) (*pa
        return &result, nil
 }
 
+func renameConflictColumns(b *block, conflictColumns 
map[string]map[string]struct{}) {
+       if len(conflictColumns) == 0 {
+               return
+       }
+       for i := range b.tagFamilies {
+               columns := conflictColumns[b.tagFamilies[i].name]
+               if columns == nil {
+                       continue
+               }
+               cc := b.tagFamilies[i].columns
+               for j := range cc {
+                       if _, ok := columns[cc[j].name]; ok {
+                               cc[j].name = encodeTypedColumn(cc[j].name, 
cc[j].valueType)
+                       }
+               }
+       }
+}
+
+func collectConflictColumns(parts []*partWrapper) 
map[string]map[string]struct{} {
+       familyColumnTypes := 
make(map[string]map[string]map[pbv1.ValueType]struct{})
+       for _, pw := range parts {
+               partTypes := readPartColumnTypes(pw.p)
+               for cf, cc := range partTypes {
+                       columnTypes := familyColumnTypes[cf]
+                       if columnTypes == nil {
+                               columnTypes = 
make(map[string]map[pbv1.ValueType]struct{})
+                               familyColumnTypes[cf] = columnTypes
+                       }
+                       for name, vt := range cc {
+                               decoded := decodeTypedColumn(name)
+                               valueTypes := columnTypes[decoded]
+                               if valueTypes == nil {
+                                       valueTypes = 
make(map[pbv1.ValueType]struct{})
+                                       columnTypes[decoded] = valueTypes
+                               }
+                               valueTypes[vt] = struct{}{}
+                       }
+               }
+       }
+       var conflictColumns map[string]map[string]struct{}
+       for cf, columnTypes := range familyColumnTypes {
+               for name, valueTypes := range columnTypes {
+                       if len(valueTypes) <= 1 {
+                               continue
+                       }
+                       if conflictColumns == nil {
+                               conflictColumns = 
make(map[string]map[string]struct{})
+                       }
+                       if conflictColumns[cf] == nil {
+                               conflictColumns[cf] = make(map[string]struct{})
+                       }
+                       conflictColumns[cf][name] = struct{}{}
+               }
+       }
+       return conflictColumns
+}
+
+func readPartColumnTypes(p *part) map[string]map[string]pbv1.ValueType {

Review Comment:
   This function is called from `collectConflictColumns` for every input part 
on every merge. It walks every block metadata in the part and, for each block, 
issues `fs.MustReadData` for each tag family and unmarshals the column-family 
metadata — purely to detect whether the merge has a type conflict. The very 
next pass (`mergeBlocks`) then re-reads the same metadata when blocks are 
loaded. In the common case where the schema hasn't changed, the result is an 
empty conflict set and the entire scan is wasted IO. Worth thinking about 
whether the per-part column→type summary can be persisted (the trace and sidx 
variants already do this — see `banyand/trace/merger.go` where `pw.p.tagType` 
is read directly) or at least cached on the `part` after open. Mirror in 
`banyand/stream/merger.go`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to