hanahmily commented on code in PR #1126:
URL:
https://github.com/apache/skywalking-banyandb/pull/1126#discussion_r3223659090
##########
banyand/measure/block.go:
##########
@@ -956,6 +968,10 @@ func fastTagAppend(bi, b *blockPointer, offset int) error {
return fmt.Errorf("unexpected tag name for tag
family %q: got %q; want %q",
bi.tagFamilies[i].name,
b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name)
}
+ }
+ }
+ for i := range bi.tagFamilies {
Review Comment:
The original `fastTagAppend` performed the name check and the append inside
the same loop. The change splits it into two full passes over the same tag
families × columns with no functional reason — renaming happens on the source
block before this function is called, so column names are stable here. The
split is extra work and creates two places that must stay in sync. Mirror in
`banyand/stream/block.go`.
##########
banyand/measure/merger.go:
##########
@@ -359,6 +366,101 @@ func mergeBlocks(closeCh <-chan struct{}, bw
*blockWriter, br *blockReader) (*pa
return &result, nil
}
+func renameConflictColumns(b *block, conflictColumns
map[string]map[string]struct{}) {
+ if len(conflictColumns) == 0 {
+ return
+ }
+ for i := range b.tagFamilies {
+ columns := conflictColumns[b.tagFamilies[i].name]
+ if columns == nil {
+ continue
+ }
+ cc := b.tagFamilies[i].columns
+ for j := range cc {
+ if _, ok := columns[cc[j].name]; ok {
+ cc[j].name = encodeTypedColumn(cc[j].name,
cc[j].valueType)
+ }
+ }
+ }
+}
+
+func collectConflictColumns(parts []*partWrapper)
map[string]map[string]struct{} {
+ familyColumnTypes :=
make(map[string]map[string]map[pbv1.ValueType]struct{})
+ for _, pw := range parts {
+ partTypes := readPartColumnTypes(pw.p)
+ for cf, cc := range partTypes {
+ columnTypes := familyColumnTypes[cf]
+ if columnTypes == nil {
+ columnTypes =
make(map[string]map[pbv1.ValueType]struct{})
+ familyColumnTypes[cf] = columnTypes
+ }
+ for name, vt := range cc {
+ decoded := decodeTypedColumn(name)
+ valueTypes := columnTypes[decoded]
+ if valueTypes == nil {
+ valueTypes =
make(map[pbv1.ValueType]struct{})
+ columnTypes[decoded] = valueTypes
+ }
+ valueTypes[vt] = struct{}{}
+ }
+ }
+ }
+ var conflictColumns map[string]map[string]struct{}
+ for cf, columnTypes := range familyColumnTypes {
+ for name, valueTypes := range columnTypes {
+ if len(valueTypes) <= 1 {
+ continue
+ }
+ if conflictColumns == nil {
+ conflictColumns =
make(map[string]map[string]struct{})
+ }
+ if conflictColumns[cf] == nil {
+ conflictColumns[cf] = make(map[string]struct{})
+ }
+ conflictColumns[cf][name] = struct{}{}
+ }
+ }
+ return conflictColumns
+}
+
+func readPartColumnTypes(p *part) map[string]map[string]pbv1.ValueType {
+ if len(p.tagFamilyMetadata) == 0 || len(p.primaryBlockMetadata) == 0 {
+ return nil
+ }
+ pmi := generatePartMergeIter()
+ defer releasePartMergeIter(pmi)
+ pmi.mustInitFromPart(p)
+
+ result := make(map[string]map[string]pbv1.ValueType)
+ bb := bigValuePool.Generate()
+ defer bigValuePool.Release(bb)
+ for pmi.nextBlockMetadata() {
+ for cf, db := range pmi.block.bm.tagFamilies {
+ reader, ok := p.tagFamilyMetadata[cf]
+ if !ok {
+ continue
+ }
+ bb.Buf = bytes.ResizeExact(bb.Buf, int(db.size))
+ fs.MustReadData(reader, int64(db.offset), bb.Buf)
+ cfm := generateColumnFamilyMetadata()
+ if _, err := cfm.unmarshal(bb.Buf); err != nil {
Review Comment:
Unmarshal failure is silently swallowed. If a part's column-family metadata
block is corrupted, conflict detection skips the column, the merger doesn't
rename it, and the resulting merged part ends up with the exact duplicate-typed
columns this change is meant to prevent — without any signal that something
went wrong. At minimum this should log a warning. Same in
`banyand/stream/merger.go`.
##########
banyand/measure/column.go:
##########
@@ -27,6 +29,57 @@ import (
"github.com/apache/skywalking-banyandb/pkg/pool"
)
+const typedColumnSeparator = "#"
Review Comment:
This introduces `#` as a reserved separator in on-disk column names. Please
confirm that the schema validator forbids `#` in user-provided tag names. If a
tag named `foo#int` is allowed, `decodeTypedColumn` will silently strip the
suffix to `foo`, which then causes `collectConflictColumns` to manufacture a
phantom conflict and the merger to rename a non-conflicting column. Same
applies to `banyand/stream/tag.go`.
##########
banyand/measure/column.go:
##########
@@ -27,6 +29,57 @@ import (
"github.com/apache/skywalking-banyandb/pkg/pool"
)
+const typedColumnSeparator = "#"
+
+var (
+ valueTypeToSuffix = map[pbv1.ValueType]string{
+ pbv1.ValueTypeStr: "str",
+ pbv1.ValueTypeInt64: "int",
+ pbv1.ValueTypeFloat64: "float",
+ pbv1.ValueTypeBinaryData: "bin",
+ pbv1.ValueTypeStrArr: "str_arr",
+ pbv1.ValueTypeInt64Arr: "int_arr",
+ pbv1.ValueTypeTimestamp: "ts",
+ pbv1.ValueTypeUnknown: "",
+ }
+ suffixToValueType = map[string]pbv1.ValueType{
+ "str": pbv1.ValueTypeStr,
+ "int": pbv1.ValueTypeInt64,
+ "float": pbv1.ValueTypeFloat64,
+ "bin": pbv1.ValueTypeBinaryData,
+ "str_arr": pbv1.ValueTypeStrArr,
+ "int_arr": pbv1.ValueTypeInt64Arr,
+ "ts": pbv1.ValueTypeTimestamp,
+ }
+)
+
+func encodeTypedColumn(name string, vt pbv1.ValueType) string {
+ suffix, ok := valueTypeToSuffix[vt]
+ if !ok || suffix == "" {
+ return name
+ }
+ return name + typedColumnSeparator + suffix
+}
+
+func decodeTypedColumn(key string) string {
+ for suffix := range suffixToValueType {
+ sepSuffix := typedColumnSeparator + suffix
+ if strings.HasSuffix(key, sepSuffix) {
+ return key[:len(key)-len(sepSuffix)]
+ }
+ }
+ return key
+}
+
+func hasTypeSuffix(key string) bool {
Review Comment:
`hasTypeSuffix` (and its unit test) is added here but is never called from
`banyand/measure` — same for the stream counterpart in `banyand/stream/tag.go`.
The only callers live in `banyand/trace` and `banyand/internal/sidx`. Either
remove the dead copies from measure/stream or wire them into the reader the way
trace does.
##########
banyand/measure/block.go:
##########
@@ -225,13 +226,24 @@ func (b *block) unmarshalTagFamily(decoder
*encoding.BytesBlockDecoder, tfIndex
cc := b.tagFamilies[tfIndex].resizeColumns(len(tagProjection))
NEXT:
for j := range tagProjection {
+ tp := tagProjection[j]
+ if schemaType, ok := schemaTagTypes[tp]; ok {
+ typedName := encodeTypedColumn(tp, schemaType)
+ for i := range cfm.columnMetadata {
+ if cfm.columnMetadata[i].name == typedName {
+ cc[j].mustReadValues(decoder,
valueReader, cfm.columnMetadata[i], uint64(b.Len()))
+ cc[j].name = tp
+ continue NEXT
+ }
+ }
+ }
for i := range cfm.columnMetadata {
Review Comment:
After the typed-name lookup misses, the fallback matches on the plain name
without checking that the on-disk `valueType` agrees with the schema type. The
four downstream output sites in this file currently filter out wrong-typed
columns with `c.valueType == schemaType` guards, so the user-visible result is
correct today — but the contract here is fragile: every future read site that
consumes `cf.columns[i].values` must remember the rule. Compare
`banyand/trace/block.go` where the projection planner gates the match on
`bm.tagType[typedTag] == schemaType` once at read time, so wrong-typed columns
simply don't appear in the cursor. Mirror in `banyand/stream/block.go`.
##########
banyand/measure/merger.go:
##########
@@ -359,6 +366,101 @@ func mergeBlocks(closeCh <-chan struct{}, bw
*blockWriter, br *blockReader) (*pa
return &result, nil
}
+func renameConflictColumns(b *block, conflictColumns
map[string]map[string]struct{}) {
+ if len(conflictColumns) == 0 {
+ return
+ }
+ for i := range b.tagFamilies {
+ columns := conflictColumns[b.tagFamilies[i].name]
+ if columns == nil {
+ continue
+ }
+ cc := b.tagFamilies[i].columns
+ for j := range cc {
+ if _, ok := columns[cc[j].name]; ok {
+ cc[j].name = encodeTypedColumn(cc[j].name,
cc[j].valueType)
+ }
+ }
+ }
+}
+
+func collectConflictColumns(parts []*partWrapper)
map[string]map[string]struct{} {
+ familyColumnTypes :=
make(map[string]map[string]map[pbv1.ValueType]struct{})
+ for _, pw := range parts {
+ partTypes := readPartColumnTypes(pw.p)
+ for cf, cc := range partTypes {
+ columnTypes := familyColumnTypes[cf]
+ if columnTypes == nil {
+ columnTypes =
make(map[string]map[pbv1.ValueType]struct{})
+ familyColumnTypes[cf] = columnTypes
+ }
+ for name, vt := range cc {
+ decoded := decodeTypedColumn(name)
+ valueTypes := columnTypes[decoded]
+ if valueTypes == nil {
+ valueTypes =
make(map[pbv1.ValueType]struct{})
+ columnTypes[decoded] = valueTypes
+ }
+ valueTypes[vt] = struct{}{}
+ }
+ }
+ }
+ var conflictColumns map[string]map[string]struct{}
+ for cf, columnTypes := range familyColumnTypes {
+ for name, valueTypes := range columnTypes {
+ if len(valueTypes) <= 1 {
+ continue
+ }
+ if conflictColumns == nil {
+ conflictColumns =
make(map[string]map[string]struct{})
+ }
+ if conflictColumns[cf] == nil {
+ conflictColumns[cf] = make(map[string]struct{})
+ }
+ conflictColumns[cf][name] = struct{}{}
+ }
+ }
+ return conflictColumns
+}
+
+func readPartColumnTypes(p *part) map[string]map[string]pbv1.ValueType {
Review Comment:
This function is called from `collectConflictColumns` for every input part
on every merge. It walks every block metadata in the part and, for each block,
issues `fs.MustReadData` for each tag family and unmarshals the column-family
metadata — purely to detect whether the merge has a type conflict. The very
next pass (`mergeBlocks`) then re-reads the same metadata when blocks are
loaded. In the common case where the schema hasn't changed, the result is an
empty conflict set and the entire scan is wasted IO. Worth thinking about
whether the per-part column→type summary can be persisted (the trace and sidx
variants already do this — see `banyand/trace/merger.go` where `pw.p.tagType`
is read directly) or at least cached on the `part` after open. Mirror in
`banyand/stream/merger.go`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]