This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch doc in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/doc by this push:
new 55cf02c6 Append different tag and field structure
55cf02c6 is described below
commit 55cf02c658b9f4898b03736a567c7d2c42468361
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Oct 5 06:16:09 2024 +0000
Append different tag and field structure
Signed-off-by: Gao Hongtao <[email protected]>
---
banyand/measure/block.go | 216 ++++++++++++++++++++-----
banyand/measure/block_test.go | 356 ++++++++++++++++++++++++++++++++++++++++++
banyand/measure/query.go | 2 +-
banyand/stream/block.go | 137 +++++++++++++---
banyand/stream/block_test.go | 214 +++++++++++++++++++++++++
5 files changed, 864 insertions(+), 61 deletions(-)
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index d6808bd3..282c0b22 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -18,6 +18,7 @@
package measure
import (
+ "fmt"
"slices"
"sort"
@@ -787,63 +788,206 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
bi.append(b, len(b.timestamps))
}
+var log = logger.GetLogger("measure").Named("block")
+
func (bi *blockPointer) append(b *blockPointer, offset int) {
if offset <= b.idx {
return
}
if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
- for _, tf := range b.tagFamilies {
- tagFamily := columnFamily{name: tf.name}
- for _, c := range tf.columns {
- col := column{name: c.name, valueType:
c.valueType}
- assertIdxAndOffset(col.name, len(c.values),
b.idx, offset)
- col.values = append(col.values,
c.values[b.idx:offset]...)
- tagFamily.columns = append(tagFamily.columns,
col)
+ fullTagAppend(bi, b, offset)
+ } else {
+ if err := fastTagAppend(bi, b, offset); err != nil {
+ if log.Debug().Enabled() {
+ log.Debug().Msgf("fastTagMerge failed: %v;
falling back to fullTagMerge", err)
}
- bi.tagFamilies = append(bi.tagFamilies, tagFamily)
+ fullTagAppend(bi, b, offset)
}
+ }
+
+ if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
+ fullFieldAppend(bi, b, offset)
} else {
- if len(bi.tagFamilies) != len(b.tagFamilies) {
- logger.Panicf("unexpected number of tag families: got
%d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
+ if err := fastFieldAppend(bi, b, offset); err != nil {
+ if log.Debug().Enabled() {
+ log.Debug().Msgf("fastFieldAppend failed: %v;
falling back to fullFieldAppend", err)
+ }
+ fullFieldAppend(bi, b, offset)
+ }
+ }
+
+ assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
+ bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
+ assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
+ bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
+}
+
+func fastTagAppend(bi, b *blockPointer, offset int) error {
+ if len(bi.tagFamilies) != len(b.tagFamilies) {
+ return fmt.Errorf("unexpected number of tag families: got %d;
want %d", len(b.tagFamilies), len(bi.tagFamilies))
+ }
+ for i := range bi.tagFamilies {
+ if bi.tagFamilies[i].name != b.tagFamilies[i].name {
+ return fmt.Errorf("unexpected tag family name: got %q;
want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
}
- for i := range bi.tagFamilies {
- if bi.tagFamilies[i].name != b.tagFamilies[i].name {
- logger.Panicf("unexpected tag family name: got
%q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
+ if len(bi.tagFamilies[i].columns) !=
len(b.tagFamilies[i].columns) {
+ return fmt.Errorf("unexpected number of tags for tag
family %q: got %d; want %d",
+ bi.tagFamilies[i].name,
len(b.tagFamilies[i].columns), len(bi.tagFamilies[i].columns))
+ }
+ for j := range bi.tagFamilies[i].columns {
+ if bi.tagFamilies[i].columns[j].name !=
b.tagFamilies[i].columns[j].name {
+ return fmt.Errorf("unexpected tag name for tag
family %q: got %q; want %q",
+ bi.tagFamilies[i].name,
b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name)
}
- if len(bi.tagFamilies[i].columns) !=
len(b.tagFamilies[i].columns) {
- logger.Panicf("unexpected number of tags for
tag family %q: got %d; want %d", bi.tagFamilies[i].name,
len(bi.tagFamilies[i].columns), len(b.tagFamilies[i].columns))
+ assertIdxAndOffset(b.tagFamilies[i].columns[j].name,
len(b.tagFamilies[i].columns[j].values), b.idx, offset)
+ bi.tagFamilies[i].columns[j].values =
append(bi.tagFamilies[i].columns[j].values,
b.tagFamilies[i].columns[j].values[b.idx:offset]...)
+ }
+ }
+ return nil
+}
+
+func fullTagAppend(bi, b *blockPointer, offset int) {
+ existDataSize := len(bi.timestamps)
+ appendTagFamilies := func(tf columnFamily) {
+ tagFamily := columnFamily{name: tf.name}
+ for i := range tf.columns {
+ assertIdxAndOffset(tf.columns[i].name,
len(tf.columns[i].values), b.idx, offset)
+ col := column{name: tf.columns[i].name, valueType:
tf.columns[i].valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values, nil)
+ }
+ col.values = append(col.values,
tf.columns[i].values[b.idx:offset]...)
+ tagFamily.columns = append(tagFamily.columns, col)
+ }
+ bi.tagFamilies = append(bi.tagFamilies, tagFamily)
+ }
+ if len(bi.tagFamilies) == 0 {
+ for _, tf := range b.tagFamilies {
+ appendTagFamilies(tf)
+ }
+ return
+ }
+
+ tagFamilyMap := make(map[string]*columnFamily)
+ for i := range bi.tagFamilies {
+ tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
+ }
+
+ for _, tf := range b.tagFamilies {
+ if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
+ columnMap := make(map[string]*column)
+ for i := range existingTagFamily.columns {
+ columnMap[existingTagFamily.columns[i].name] =
&existingTagFamily.columns[i]
}
- for j := range bi.tagFamilies[i].columns {
- if bi.tagFamilies[i].columns[j].name !=
b.tagFamilies[i].columns[j].name {
- logger.Panicf("unexpected tag name for
tag family %q: got %q; want %q", bi.tagFamilies[i].name,
bi.tagFamilies[i].columns[j].name, b.tagFamilies[i].columns[j].name)
+
+ for _, c := range tf.columns {
+ if existingColumn, exists := columnMap[c.name];
exists {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ existingColumn.values =
append(existingColumn.values, c.values[b.idx:offset]...)
+ } else {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ col := column{name: c.name, valueType:
c.valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values,
nil)
+ }
+ col.values = append(col.values,
c.values[b.idx:offset]...)
+ existingTagFamily.columns =
append(existingTagFamily.columns, col)
}
-
assertIdxAndOffset(b.tagFamilies[i].columns[j].name,
len(b.tagFamilies[i].columns[j].values), b.idx, offset)
- bi.tagFamilies[i].columns[j].values =
append(bi.tagFamilies[i].columns[j].values,
b.tagFamilies[i].columns[j].values[b.idx:offset]...)
}
+ } else {
+ appendTagFamilies(tf)
}
}
+ for k := range tagFamilyMap {
+ delete(tagFamilyMap, k)
+ }
+ for i := range b.tagFamilies {
+ tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
+ }
+ emptySize := offset - b.idx
+ for _, tf := range bi.tagFamilies {
+ if _, exists := tagFamilyMap[tf.name]; !exists {
+ for i := range tf.columns {
+ for j := 0; j < emptySize; j++ {
+ tf.columns[i].values =
append(tf.columns[i].values, nil)
+ }
+ }
+ } else {
+ existingTagFamily := tagFamilyMap[tf.name]
+ columnMap := make(map[string]*column)
+ for i := range existingTagFamily.columns {
+ columnMap[existingTagFamily.columns[i].name] =
&existingTagFamily.columns[i]
+ }
+ for i := range tf.columns {
+ if _, exists := columnMap[tf.columns[i].name];
!exists {
+ for j := 0; j < emptySize; j++ {
+ tf.columns[i].values =
append(tf.columns[i].values, nil)
+ }
+ }
+ }
+ }
+ }
+}
- if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
- for _, c := range b.field.columns {
- col := column{name: c.name, valueType: c.valueType}
- assertIdxAndOffset(col.name, len(c.values), b.idx,
offset)
- col.values = append(col.values,
c.values[b.idx:offset]...)
- bi.field.columns = append(bi.field.columns, col)
+func fastFieldAppend(bi, b *blockPointer, offset int) error {
+ if len(bi.field.columns) != len(b.field.columns) {
+ return fmt.Errorf("unexpected number of fields: got %d; want
%d", len(bi.field.columns), len(b.field.columns))
+ }
+ for i := range bi.field.columns {
+ if bi.field.columns[i].name != b.field.columns[i].name {
+ return fmt.Errorf("unexpected field name: got %q; want
%q", b.field.columns[i].name, bi.field.columns[i].name)
}
- } else {
- if len(bi.field.columns) != len(b.field.columns) {
- logger.Panicf("unexpected number of fields: got %d;
want %d", len(bi.field.columns), len(b.field.columns))
+ assertIdxAndOffset(b.field.columns[i].name,
len(b.field.columns[i].values), b.idx, offset)
+ bi.field.columns[i].values = append(bi.field.columns[i].values,
b.field.columns[i].values[b.idx:offset]...)
+ }
+ return nil
+}
+
+func fullFieldAppend(bi, b *blockPointer, offset int) {
+ existDataSize := len(bi.timestamps)
+ appendFields := func(c column) {
+ col := column{name: c.name, valueType: c.valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values, nil)
}
- for i := range bi.field.columns {
- assertIdxAndOffset(b.field.columns[i].name,
len(b.field.columns[i].values), b.idx, offset)
- bi.field.columns[i].values =
append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...)
+ col.values = append(col.values, c.values[b.idx:offset]...)
+ bi.field.columns = append(bi.field.columns, col)
+ }
+ if len(bi.field.columns) == 0 {
+ for _, c := range b.field.columns {
+ appendFields(c)
}
+ return
}
- assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
- bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
- assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
- bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
+ fieldMap := make(map[string]*column)
+ for i := range bi.field.columns {
+ fieldMap[bi.field.columns[i].name] = &bi.field.columns[i]
+ }
+
+ for _, c := range b.field.columns {
+ if existingField, exists := fieldMap[c.name]; exists {
+ assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
+ existingField.values = append(existingField.values,
c.values[b.idx:offset]...)
+ } else {
+ appendFields(c)
+ }
+ }
+ for k := range fieldMap {
+ delete(fieldMap, k)
+ }
+ for i := range b.field.columns {
+ fieldMap[b.field.columns[i].name] = &b.field.columns[i]
+ }
+
+ emptySize := offset - b.idx
+ for i := range bi.field.columns {
+ if _, exists := fieldMap[bi.field.columns[i].name]; !exists {
+ for j := 0; j < emptySize; j++ {
+ bi.field.columns[i].values =
append(bi.field.columns[i].values, nil)
+ }
+ }
+ }
}
func assertIdxAndOffset(name string, length int, idx int, offset int) {
diff --git a/banyand/measure/block_test.go b/banyand/measure/block_test.go
index 5d41f236..cabaea10 100644
--- a/banyand/measure/block_test.go
+++ b/banyand/measure/block_test.go
@@ -716,6 +716,362 @@ func Test_blockPointer_append(t *testing.T) {
idx: 0,
},
},
+ {
+ name: "Test append with missing tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns:
[]column{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with missing tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name:
"arrTag",
+ columns:
[]column{},
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{},
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns:
[]column{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with missing field",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{},
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), nil, nil}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional field",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{},
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, nil, []byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index d9d19918..e4b1cb3d 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -431,13 +431,13 @@ func binaryDataFieldValue(value []byte)
*modelv1.FieldValue {
type queryResult struct {
ctx context.Context
- hit int
sidToIndex map[common.SeriesID]int
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
tagProjection []model.TagProjection
data []*blockCursor
snapshots []*snapshot
segments []storage.Segment[*tsTable, option]
+ hit int
loaded bool
orderByTS bool
ascTS bool
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 3b65e88e..956c1bf2 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -18,6 +18,7 @@
package stream
import (
+ "fmt"
"sort"
"golang.org/x/exp/slices"
@@ -603,45 +604,133 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
bi.append(b, len(b.timestamps))
}
+var log = logger.GetLogger("stream").Named("block")
+
func (bi *blockPointer) append(b *blockPointer, offset int) {
if offset <= b.idx {
return
}
if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
+ fullTagAppend(bi, b, offset)
+ } else {
+ if err := fastTagAppend(bi, b, offset); err != nil {
+ if log.Debug().Enabled() {
+ log.Debug().Msgf("fastTagMerge failed: %v;
falling back to fullTagMerge", err)
+ }
+ fullTagAppend(bi, b, offset)
+ }
+ }
+
+ assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
+ bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
+ bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...)
+}
+
+func fastTagAppend(bi, b *blockPointer, offset int) error {
+ if len(bi.tagFamilies) != len(b.tagFamilies) {
+ return fmt.Errorf("unexpected number of tag families: got %d;
want %d", len(b.tagFamilies), len(bi.tagFamilies))
+ }
+ for i := range bi.tagFamilies {
+ if bi.tagFamilies[i].name != b.tagFamilies[i].name {
+ return fmt.Errorf("unexpected tag family name: got %q;
want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
+ }
+ if len(bi.tagFamilies[i].tags) != len(b.tagFamilies[i].tags) {
+ return fmt.Errorf("unexpected number of tags for tag
family %q: got %d; want %d",
+ bi.tagFamilies[i].name,
len(b.tagFamilies[i].tags), len(bi.tagFamilies[i].tags))
+ }
+ for j := range bi.tagFamilies[i].tags {
+ if bi.tagFamilies[i].tags[j].name !=
b.tagFamilies[i].tags[j].name {
+ return fmt.Errorf("unexpected tag name for tag
family %q: got %q; want %q",
+ bi.tagFamilies[i].name,
b.tagFamilies[i].tags[j].name, bi.tagFamilies[i].tags[j].name)
+ }
+ assertIdxAndOffset(b.tagFamilies[i].tags[j].name,
len(b.tagFamilies[i].tags[j].values), b.idx, offset)
+ bi.tagFamilies[i].tags[j].values =
append(bi.tagFamilies[i].tags[j].values,
b.tagFamilies[i].tags[j].values[b.idx:offset]...)
+ }
+ }
+ return nil
+}
+
+func fullTagAppend(bi, b *blockPointer, offset int) {
+ existDataSize := len(bi.timestamps)
+ appendTagFamilies := func(tf tagFamily) {
+ tfv := tagFamily{name: tf.name}
+ for i := range tf.tags {
+ assertIdxAndOffset(tf.tags[i].name,
len(tf.tags[i].values), b.idx, offset)
+ col := tag{name: tf.tags[i].name, valueType:
tf.tags[i].valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values, nil)
+ }
+ col.values = append(col.values,
tf.tags[i].values[b.idx:offset]...)
+ tfv.tags = append(tfv.tags, col)
+ }
+ bi.tagFamilies = append(bi.tagFamilies, tfv)
+ }
+ if len(bi.tagFamilies) == 0 {
for _, tf := range b.tagFamilies {
- tFamily := tagFamily{name: tf.name}
+ appendTagFamilies(tf)
+ }
+ return
+ }
+
+ tagFamilyMap := make(map[string]*tagFamily)
+ for i := range bi.tagFamilies {
+ tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
+ }
+
+ for _, tf := range b.tagFamilies {
+ if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
+ columnMap := make(map[string]*tag)
+ for i := range existingTagFamily.tags {
+ columnMap[existingTagFamily.tags[i].name] =
&existingTagFamily.tags[i]
+ }
+
for _, c := range tf.tags {
- col := tag{name: c.name, valueType: c.valueType}
- assertIdxAndOffset(col.name, len(c.values),
b.idx, offset)
- col.values = append(col.values,
c.values[b.idx:offset]...)
- tFamily.tags = append(tFamily.tags, col)
+ if existingColumn, exists := columnMap[c.name];
exists {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ existingColumn.values =
append(existingColumn.values, c.values[b.idx:offset]...)
+ } else {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ col := tag{name: c.name, valueType:
c.valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values,
nil)
+ }
+ col.values = append(col.values,
c.values[b.idx:offset]...)
+ existingTagFamily.tags =
append(existingTagFamily.tags, col)
+ }
}
- bi.tagFamilies = append(bi.tagFamilies, tFamily)
+ } else {
+ appendTagFamilies(tf)
}
- } else {
- if len(bi.tagFamilies) != len(b.tagFamilies) {
- logger.Panicf("unexpected number of tag families: got
%d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
- }
- for i := range bi.tagFamilies {
- if bi.tagFamilies[i].name != b.tagFamilies[i].name {
- logger.Panicf("unexpected tag family name: got
%q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
+ }
+ for k := range tagFamilyMap {
+ delete(tagFamilyMap, k)
+ }
+ for i := range b.tagFamilies {
+ tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
+ }
+ emptySize := offset - b.idx
+ for _, tf := range bi.tagFamilies {
+ if _, exists := tagFamilyMap[tf.name]; !exists {
+ for i := range tf.tags {
+ for j := 0; j < emptySize; j++ {
+ tf.tags[i].values =
append(tf.tags[i].values, nil)
+ }
}
- if len(bi.tagFamilies[i].tags) !=
len(b.tagFamilies[i].tags) {
- logger.Panicf("unexpected number of tags for
tag family %q: got %d; want %d", bi.tagFamilies[i].name,
len(bi.tagFamilies[i].tags), len(b.tagFamilies[i].tags))
+ } else {
+ existingTagFamily := tagFamilyMap[tf.name]
+ columnMap := make(map[string]*tag)
+ for i := range existingTagFamily.tags {
+ columnMap[existingTagFamily.tags[i].name] =
&existingTagFamily.tags[i]
}
- for j := range bi.tagFamilies[i].tags {
- if bi.tagFamilies[i].tags[j].name !=
b.tagFamilies[i].tags[j].name {
- logger.Panicf("unexpected tag name for
tag family %q: got %q; want %q", bi.tagFamilies[i].name,
bi.tagFamilies[i].tags[j].name, b.tagFamilies[i].tags[j].name)
+ for i := range tf.tags {
+ if _, exists := columnMap[tf.tags[i].name];
!exists {
+ for j := 0; j < emptySize; j++ {
+ tf.tags[i].values =
append(tf.tags[i].values, nil)
+ }
}
-
assertIdxAndOffset(b.tagFamilies[i].tags[j].name,
len(b.tagFamilies[i].tags[j].values), b.idx, offset)
- bi.tagFamilies[i].tags[j].values =
append(bi.tagFamilies[i].tags[j].values,
b.tagFamilies[i].tags[j].values[b.idx:offset]...)
}
}
}
-
- assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
- bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
- bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...)
}
func assertIdxAndOffset(name string, length int, idx int, offset int) {
diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go
index ed57f48d..3632d638 100644
--- a/banyand/stream/block_test.go
+++ b/banyand/stream/block_test.go
@@ -609,6 +609,220 @@ func Test_blockPointer_append(t *testing.T) {
idx: 0,
},
},
+ {
+ name: "Test append with missing tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{},
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{},
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with missing tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{},
+ },
+ },
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
