This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new d14d599e Fix bugs and add docs (#548)
d14d599e is described below
commit d14d599ecaa58a36c18ee3bd01c2d8567206f28b
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Oct 6 20:39:41 2024 +0800
Fix bugs and add docs (#548)
* Add doc
* Append different tag and field structure
* Fix the bug that accesses restored data
---------
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
banyand/Dockerfile | 3 +
banyand/measure/block.go | 216 ++++++++++---
banyand/measure/block_test.go | 356 +++++++++++++++++++++
banyand/measure/query.go | 48 ++-
banyand/measure/write.go | 8 +-
banyand/metadata/schema/etcd.go | 6 +-
banyand/queue/sub/sub.go | 9 +-
banyand/stream/block.go | 137 ++++++--
banyand/stream/block_test.go | 214 +++++++++++++
banyand/stream/query.go | 39 ++-
banyand/stream/write.go | 8 +-
docs/concept/tsdb.md | 25 +-
docs/menu.yml | 2 +-
pkg/index/inverted/inverted.go | 2 +-
pkg/index/inverted/inverted_series.go | 9 +-
.../logical/stream/stream_plan_indexscan_local.go | 5 +
test/docker/Dockerfile | 2 +
18 files changed, 976 insertions(+), 114 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 652d735f..bb78d51d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -47,6 +47,7 @@ Release Notes.
- Fix panic when reading a disorder block of measure. This block's versions
are not sorted in descending order.
- Fix the bug that the etcd client doesn't reconnect when facing the context
timeout in the startup phase.
- Fix the bug that the long running query doesn't stop when the context is
canceled.
+- Fix the bug that merge block with different tags or fields.
### Documentation
diff --git a/banyand/Dockerfile b/banyand/Dockerfile
index 42db44e5..5f21348f 100644
--- a/banyand/Dockerfile
+++ b/banyand/Dockerfile
@@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static
"/banyand"
FROM build-${TARGETOS} AS final
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR
+ENV GRPC_GO_LOG_FORMATTER=json
+
EXPOSE 17912
EXPOSE 17913
EXPOSE 6060
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index d6808bd3..282c0b22 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -18,6 +18,7 @@
package measure
import (
+ "fmt"
"slices"
"sort"
@@ -787,63 +788,206 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
bi.append(b, len(b.timestamps))
}
+var log = logger.GetLogger("measure").Named("block")
+
func (bi *blockPointer) append(b *blockPointer, offset int) {
if offset <= b.idx {
return
}
if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
- for _, tf := range b.tagFamilies {
- tagFamily := columnFamily{name: tf.name}
- for _, c := range tf.columns {
- col := column{name: c.name, valueType:
c.valueType}
- assertIdxAndOffset(col.name, len(c.values),
b.idx, offset)
- col.values = append(col.values,
c.values[b.idx:offset]...)
- tagFamily.columns = append(tagFamily.columns,
col)
+ fullTagAppend(bi, b, offset)
+ } else {
+ if err := fastTagAppend(bi, b, offset); err != nil {
+ if log.Debug().Enabled() {
+ log.Debug().Msgf("fastTagMerge failed: %v;
falling back to fullTagMerge", err)
}
- bi.tagFamilies = append(bi.tagFamilies, tagFamily)
+ fullTagAppend(bi, b, offset)
}
+ }
+
+ if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
+ fullFieldAppend(bi, b, offset)
} else {
- if len(bi.tagFamilies) != len(b.tagFamilies) {
- logger.Panicf("unexpected number of tag families: got
%d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
+ if err := fastFieldAppend(bi, b, offset); err != nil {
+ if log.Debug().Enabled() {
+ log.Debug().Msgf("fastFieldAppend failed: %v;
falling back to fullFieldAppend", err)
+ }
+ fullFieldAppend(bi, b, offset)
+ }
+ }
+
+ assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
+ bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
+ assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
+ bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
+}
+
+func fastTagAppend(bi, b *blockPointer, offset int) error {
+ if len(bi.tagFamilies) != len(b.tagFamilies) {
+ return fmt.Errorf("unexpected number of tag families: got %d;
want %d", len(b.tagFamilies), len(bi.tagFamilies))
+ }
+ for i := range bi.tagFamilies {
+ if bi.tagFamilies[i].name != b.tagFamilies[i].name {
+ return fmt.Errorf("unexpected tag family name: got %q;
want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
}
- for i := range bi.tagFamilies {
- if bi.tagFamilies[i].name != b.tagFamilies[i].name {
- logger.Panicf("unexpected tag family name: got
%q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
+ if len(bi.tagFamilies[i].columns) !=
len(b.tagFamilies[i].columns) {
+ return fmt.Errorf("unexpected number of tags for tag
family %q: got %d; want %d",
+ bi.tagFamilies[i].name,
len(b.tagFamilies[i].columns), len(bi.tagFamilies[i].columns))
+ }
+ for j := range bi.tagFamilies[i].columns {
+ if bi.tagFamilies[i].columns[j].name !=
b.tagFamilies[i].columns[j].name {
+ return fmt.Errorf("unexpected tag name for tag
family %q: got %q; want %q",
+ bi.tagFamilies[i].name,
b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name)
}
- if len(bi.tagFamilies[i].columns) !=
len(b.tagFamilies[i].columns) {
- logger.Panicf("unexpected number of tags for
tag family %q: got %d; want %d", bi.tagFamilies[i].name,
len(bi.tagFamilies[i].columns), len(b.tagFamilies[i].columns))
+ assertIdxAndOffset(b.tagFamilies[i].columns[j].name,
len(b.tagFamilies[i].columns[j].values), b.idx, offset)
+ bi.tagFamilies[i].columns[j].values =
append(bi.tagFamilies[i].columns[j].values,
b.tagFamilies[i].columns[j].values[b.idx:offset]...)
+ }
+ }
+ return nil
+}
+
+func fullTagAppend(bi, b *blockPointer, offset int) {
+ existDataSize := len(bi.timestamps)
+ appendTagFamilies := func(tf columnFamily) {
+ tagFamily := columnFamily{name: tf.name}
+ for i := range tf.columns {
+ assertIdxAndOffset(tf.columns[i].name,
len(tf.columns[i].values), b.idx, offset)
+ col := column{name: tf.columns[i].name, valueType:
tf.columns[i].valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values, nil)
+ }
+ col.values = append(col.values,
tf.columns[i].values[b.idx:offset]...)
+ tagFamily.columns = append(tagFamily.columns, col)
+ }
+ bi.tagFamilies = append(bi.tagFamilies, tagFamily)
+ }
+ if len(bi.tagFamilies) == 0 {
+ for _, tf := range b.tagFamilies {
+ appendTagFamilies(tf)
+ }
+ return
+ }
+
+ tagFamilyMap := make(map[string]*columnFamily)
+ for i := range bi.tagFamilies {
+ tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
+ }
+
+ for _, tf := range b.tagFamilies {
+ if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
+ columnMap := make(map[string]*column)
+ for i := range existingTagFamily.columns {
+ columnMap[existingTagFamily.columns[i].name] =
&existingTagFamily.columns[i]
}
- for j := range bi.tagFamilies[i].columns {
- if bi.tagFamilies[i].columns[j].name !=
b.tagFamilies[i].columns[j].name {
- logger.Panicf("unexpected tag name for
tag family %q: got %q; want %q", bi.tagFamilies[i].name,
bi.tagFamilies[i].columns[j].name, b.tagFamilies[i].columns[j].name)
+
+ for _, c := range tf.columns {
+ if existingColumn, exists := columnMap[c.name];
exists {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ existingColumn.values =
append(existingColumn.values, c.values[b.idx:offset]...)
+ } else {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ col := column{name: c.name, valueType:
c.valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values,
nil)
+ }
+ col.values = append(col.values,
c.values[b.idx:offset]...)
+ existingTagFamily.columns =
append(existingTagFamily.columns, col)
}
-
assertIdxAndOffset(b.tagFamilies[i].columns[j].name,
len(b.tagFamilies[i].columns[j].values), b.idx, offset)
- bi.tagFamilies[i].columns[j].values =
append(bi.tagFamilies[i].columns[j].values,
b.tagFamilies[i].columns[j].values[b.idx:offset]...)
}
+ } else {
+ appendTagFamilies(tf)
}
}
+ for k := range tagFamilyMap {
+ delete(tagFamilyMap, k)
+ }
+ for i := range b.tagFamilies {
+ tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
+ }
+ emptySize := offset - b.idx
+ for _, tf := range bi.tagFamilies {
+ if _, exists := tagFamilyMap[tf.name]; !exists {
+ for i := range tf.columns {
+ for j := 0; j < emptySize; j++ {
+ tf.columns[i].values =
append(tf.columns[i].values, nil)
+ }
+ }
+ } else {
+ existingTagFamily := tagFamilyMap[tf.name]
+ columnMap := make(map[string]*column)
+ for i := range existingTagFamily.columns {
+ columnMap[existingTagFamily.columns[i].name] =
&existingTagFamily.columns[i]
+ }
+ for i := range tf.columns {
+ if _, exists := columnMap[tf.columns[i].name];
!exists {
+ for j := 0; j < emptySize; j++ {
+ tf.columns[i].values =
append(tf.columns[i].values, nil)
+ }
+ }
+ }
+ }
+ }
+}
- if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
- for _, c := range b.field.columns {
- col := column{name: c.name, valueType: c.valueType}
- assertIdxAndOffset(col.name, len(c.values), b.idx,
offset)
- col.values = append(col.values,
c.values[b.idx:offset]...)
- bi.field.columns = append(bi.field.columns, col)
+func fastFieldAppend(bi, b *blockPointer, offset int) error {
+ if len(bi.field.columns) != len(b.field.columns) {
+ return fmt.Errorf("unexpected number of fields: got %d; want
%d", len(bi.field.columns), len(b.field.columns))
+ }
+ for i := range bi.field.columns {
+ if bi.field.columns[i].name != b.field.columns[i].name {
+ return fmt.Errorf("unexpected field name: got %q; want
%q", b.field.columns[i].name, bi.field.columns[i].name)
}
- } else {
- if len(bi.field.columns) != len(b.field.columns) {
- logger.Panicf("unexpected number of fields: got %d;
want %d", len(bi.field.columns), len(b.field.columns))
+ assertIdxAndOffset(b.field.columns[i].name,
len(b.field.columns[i].values), b.idx, offset)
+ bi.field.columns[i].values = append(bi.field.columns[i].values,
b.field.columns[i].values[b.idx:offset]...)
+ }
+ return nil
+}
+
+func fullFieldAppend(bi, b *blockPointer, offset int) {
+ existDataSize := len(bi.timestamps)
+ appendFields := func(c column) {
+ col := column{name: c.name, valueType: c.valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values, nil)
}
- for i := range bi.field.columns {
- assertIdxAndOffset(b.field.columns[i].name,
len(b.field.columns[i].values), b.idx, offset)
- bi.field.columns[i].values =
append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...)
+ col.values = append(col.values, c.values[b.idx:offset]...)
+ bi.field.columns = append(bi.field.columns, col)
+ }
+ if len(bi.field.columns) == 0 {
+ for _, c := range b.field.columns {
+ appendFields(c)
}
+ return
}
- assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
- bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
- assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
- bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
+ fieldMap := make(map[string]*column)
+ for i := range bi.field.columns {
+ fieldMap[bi.field.columns[i].name] = &bi.field.columns[i]
+ }
+
+ for _, c := range b.field.columns {
+ if existingField, exists := fieldMap[c.name]; exists {
+ assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
+ existingField.values = append(existingField.values,
c.values[b.idx:offset]...)
+ } else {
+ appendFields(c)
+ }
+ }
+ for k := range fieldMap {
+ delete(fieldMap, k)
+ }
+ for i := range b.field.columns {
+ fieldMap[b.field.columns[i].name] = &b.field.columns[i]
+ }
+
+ emptySize := offset - b.idx
+ for i := range bi.field.columns {
+ if _, exists := fieldMap[bi.field.columns[i].name]; !exists {
+ for j := 0; j < emptySize; j++ {
+ bi.field.columns[i].values =
append(bi.field.columns[i].values, nil)
+ }
+ }
+ }
}
func assertIdxAndOffset(name string, length int, idx int, offset int) {
diff --git a/banyand/measure/block_test.go b/banyand/measure/block_test.go
index 5d41f236..cabaea10 100644
--- a/banyand/measure/block_test.go
+++ b/banyand/measure/block_test.go
@@ -716,6 +716,362 @@ func Test_blockPointer_append(t *testing.T) {
idx: 0,
},
},
+ {
+ name: "Test append with missing tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns:
[]column{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with missing tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name:
"arrTag",
+ columns:
[]column{},
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{},
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns:
[]column{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), []byte("field5"), []byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with missing field",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+ },
+ },
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{},
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"),
[]byte("field4"), nil, nil}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional field",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{},
+ partID: 1,
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name:
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
+ tagFamilies: []columnFamily{},
+ field: columnFamily{
+ columns: []column{
+ {name: "strField",
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, nil, []byte("field5"),
[]byte("field6")}},
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index ac1ccfa0..299402fe 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -41,7 +41,8 @@ import (
)
const (
- preloadSize = 100
+ preloadSize = 100
+ checkDoneEvery = 128
)
// Query allow to retrieve measure data points.
@@ -252,12 +253,16 @@ func (s *measure) searchBlocks(ctx context.Context,
result *queryResult, sids []
if tstIter.Error() != nil {
return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
}
+ var hit int
for tstIter.nextBlock() {
- select {
- case <-ctx.Done():
- return errors.WithMessagef(ctx.Err(), "interrupt:
scanned %d blocks, remained %d/%d parts to scan", len(result.data),
len(tstIter.piHeap), len(tstIter.piPool))
- default:
+ if hit%checkDoneEvery == 0 {
+ select {
+ case <-ctx.Done():
+ return errors.WithMessagef(ctx.Err(),
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data),
len(tstIter.piHeap), len(tstIter.piPool))
+ default:
+ }
}
+ hit++
bc := generateBlockCursor()
p := tstIter.piHeap[0]
bc.init(p.p, p.curBlock, qo)
@@ -432,12 +437,20 @@ type queryResult struct {
data []*blockCursor
snapshots []*snapshot
segments []storage.Segment[*tsTable, option]
+ hit int
loaded bool
orderByTS bool
ascTS bool
}
func (qr *queryResult) Pull() *model.MeasureResult {
+ select {
+ case <-qr.ctx.Done():
+ return &model.MeasureResult{
+ Error: errors.WithMessagef(qr.ctx.Err(), "interrupt:
hit %d", qr.hit),
+ }
+ default:
+ }
if !qr.loaded {
if len(qr.data) == 0 {
return nil
@@ -446,6 +459,12 @@ func (qr *queryResult) Pull() *model.MeasureResult {
cursorChan := make(chan int, len(qr.data))
for i := 0; i < len(qr.data); i++ {
go func(i int) {
+ select {
+ case <-qr.ctx.Done():
+ cursorChan <- i
+ return
+ default:
+ }
tmpBlock := generateBlock()
defer releaseBlock(tmpBlock)
if !qr.data[i].loadData(tmpBlock) {
@@ -461,17 +480,18 @@ func (qr *queryResult) Pull() *model.MeasureResult {
blankCursorList := []int{}
for completed := 0; completed < len(qr.data); completed++ {
- select {
- case <-qr.ctx.Done():
- return &model.MeasureResult{
- Error:
errors.WithMessagef(qr.ctx.Err(), "interrupt: loaded %d/%d cursors", completed,
len(qr.data)),
- }
- case result := <-cursorChan:
- if result != -1 {
- blankCursorList =
append(blankCursorList, result)
- }
+ result := <-cursorChan
+ if result != -1 {
+ blankCursorList = append(blankCursorList,
result)
}
}
+ select {
+ case <-qr.ctx.Done():
+ return &model.MeasureResult{
+ Error: errors.WithMessagef(qr.ctx.Err(),
"interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)),
+ }
+ default:
+ }
sort.Slice(blankCursorList, func(i, j int) bool {
return blankCursorList[i] > blankCursorList[j]
})
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index da5ba404..64362ab2 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -238,7 +238,7 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable,
option], dpg *dataPoi
return dpt, nil
}
-func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp
bus.Message) {
+func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp
bus.Message) {
events, ok := message.Data().([]any)
if !ok {
w.l.Warn().Msg("invalid event data type")
@@ -250,12 +250,6 @@ func (w *writeCallback) Rev(ctx context.Context, message
bus.Message) (resp bus.
}
groups := make(map[string]*dataPointsInGroup)
for i := range events {
- select {
- case <-ctx.Done():
- w.l.Warn().Msgf("context is done, handled %d events", i)
- break
- default:
- }
var writeEvent *measurev1.InternalWriteRequest
switch e := events[i].(type) {
case *measurev1.InternalWriteRequest:
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index b9a7e601..0e112039 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -521,10 +521,8 @@ func (e *etcdSchemaRegistry) revokeLease(lease
*clientv3.LeaseGrantResponse) {
ctx, cancel := context.WithTimeout(context.Background(), leaseDuration)
defer cancel()
_, err := e.client.Lease.Revoke(ctx, lease.ID)
- if err != nil {
- if !errors.Is(err, context.DeadlineExceeded) {
- e.l.Error().Err(err).Msgf("failed to revoke lease %d",
lease.ID)
- }
+ if err != nil && e.l.Debug().Enabled() {
+ e.l.Debug().Err(err).Msgf("failed to revoke lease %d", lease.ID)
}
}
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index 0b3f2ae9..46db4174 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -39,12 +39,11 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
reply := func(writeEntity *clusterv1.SendRequest, err error, message
string) {
s.log.Error().Stringer("request",
writeEntity).Err(err).Msg(message)
s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
- s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
if errResp := stream.Send(&clusterv1.SendResponse{
MessageId: writeEntity.MessageId,
Error: message,
}); errResp != nil {
- s.log.Err(errResp).AnErr("original",
err).Stringer("request", writeEntity).Msg("failed to send error response")
+ s.log.Error().Err(errResp).AnErr("original",
err).Stringer("request", writeEntity).Msg("failed to send error response")
s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
}
}
@@ -153,6 +152,12 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
case proto.Message:
message = d
case common.Error:
+ select {
+ case <-ctx.Done():
+ s.metrics.totalMsgReceivedErr.Inc(1,
writeEntity.Topic)
+ return ctx.Err()
+ default:
+ }
reply(writeEntity, nil, d.Msg())
continue
default:
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 3b65e88e..956c1bf2 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -18,6 +18,7 @@
package stream
import (
+ "fmt"
"sort"
"golang.org/x/exp/slices"
@@ -603,45 +604,133 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
bi.append(b, len(b.timestamps))
}
+var log = logger.GetLogger("stream").Named("block")
+
func (bi *blockPointer) append(b *blockPointer, offset int) {
if offset <= b.idx {
return
}
if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
+ fullTagAppend(bi, b, offset)
+ } else {
+ if err := fastTagAppend(bi, b, offset); err != nil {
+ if log.Debug().Enabled() {
+ log.Debug().Msgf("fastTagMerge failed: %v;
falling back to fullTagMerge", err)
+ }
+ fullTagAppend(bi, b, offset)
+ }
+ }
+
+ assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
+ bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
+ bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...)
+}
+
+func fastTagAppend(bi, b *blockPointer, offset int) error {
+ if len(bi.tagFamilies) != len(b.tagFamilies) {
+ return fmt.Errorf("unexpected number of tag families: got %d;
want %d", len(b.tagFamilies), len(bi.tagFamilies))
+ }
+ for i := range bi.tagFamilies {
+ if bi.tagFamilies[i].name != b.tagFamilies[i].name {
+ return fmt.Errorf("unexpected tag family name: got %q;
want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
+ }
+ if len(bi.tagFamilies[i].tags) != len(b.tagFamilies[i].tags) {
+ return fmt.Errorf("unexpected number of tags for tag
family %q: got %d; want %d",
+ bi.tagFamilies[i].name,
len(b.tagFamilies[i].tags), len(bi.tagFamilies[i].tags))
+ }
+ for j := range bi.tagFamilies[i].tags {
+ if bi.tagFamilies[i].tags[j].name !=
b.tagFamilies[i].tags[j].name {
+ return fmt.Errorf("unexpected tag name for tag
family %q: got %q; want %q",
+ bi.tagFamilies[i].name,
b.tagFamilies[i].tags[j].name, bi.tagFamilies[i].tags[j].name)
+ }
+ assertIdxAndOffset(b.tagFamilies[i].tags[j].name,
len(b.tagFamilies[i].tags[j].values), b.idx, offset)
+ bi.tagFamilies[i].tags[j].values =
append(bi.tagFamilies[i].tags[j].values,
b.tagFamilies[i].tags[j].values[b.idx:offset]...)
+ }
+ }
+ return nil
+}
+
+func fullTagAppend(bi, b *blockPointer, offset int) {
+ existDataSize := len(bi.timestamps)
+ appendTagFamilies := func(tf tagFamily) {
+ tfv := tagFamily{name: tf.name}
+ for i := range tf.tags {
+ assertIdxAndOffset(tf.tags[i].name,
len(tf.tags[i].values), b.idx, offset)
+ col := tag{name: tf.tags[i].name, valueType:
tf.tags[i].valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values, nil)
+ }
+ col.values = append(col.values,
tf.tags[i].values[b.idx:offset]...)
+ tfv.tags = append(tfv.tags, col)
+ }
+ bi.tagFamilies = append(bi.tagFamilies, tfv)
+ }
+ if len(bi.tagFamilies) == 0 {
for _, tf := range b.tagFamilies {
- tFamily := tagFamily{name: tf.name}
+ appendTagFamilies(tf)
+ }
+ return
+ }
+
+ tagFamilyMap := make(map[string]*tagFamily)
+ for i := range bi.tagFamilies {
+ tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
+ }
+
+ for _, tf := range b.tagFamilies {
+ if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
+ columnMap := make(map[string]*tag)
+ for i := range existingTagFamily.tags {
+ columnMap[existingTagFamily.tags[i].name] =
&existingTagFamily.tags[i]
+ }
+
for _, c := range tf.tags {
- col := tag{name: c.name, valueType: c.valueType}
- assertIdxAndOffset(col.name, len(c.values),
b.idx, offset)
- col.values = append(col.values,
c.values[b.idx:offset]...)
- tFamily.tags = append(tFamily.tags, col)
+ if existingColumn, exists := columnMap[c.name];
exists {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ existingColumn.values =
append(existingColumn.values, c.values[b.idx:offset]...)
+ } else {
+ assertIdxAndOffset(c.name,
len(c.values), b.idx, offset)
+ col := tag{name: c.name, valueType:
c.valueType}
+ for j := 0; j < existDataSize; j++ {
+ col.values = append(col.values,
nil)
+ }
+ col.values = append(col.values,
c.values[b.idx:offset]...)
+ existingTagFamily.tags =
append(existingTagFamily.tags, col)
+ }
}
- bi.tagFamilies = append(bi.tagFamilies, tFamily)
+ } else {
+ appendTagFamilies(tf)
}
- } else {
- if len(bi.tagFamilies) != len(b.tagFamilies) {
- logger.Panicf("unexpected number of tag families: got
%d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
- }
- for i := range bi.tagFamilies {
- if bi.tagFamilies[i].name != b.tagFamilies[i].name {
- logger.Panicf("unexpected tag family name: got
%q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
+ }
+ for k := range tagFamilyMap {
+ delete(tagFamilyMap, k)
+ }
+ for i := range b.tagFamilies {
+ tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
+ }
+ emptySize := offset - b.idx
+ for _, tf := range bi.tagFamilies {
+ if _, exists := tagFamilyMap[tf.name]; !exists {
+ for i := range tf.tags {
+ for j := 0; j < emptySize; j++ {
+ tf.tags[i].values =
append(tf.tags[i].values, nil)
+ }
}
- if len(bi.tagFamilies[i].tags) !=
len(b.tagFamilies[i].tags) {
- logger.Panicf("unexpected number of tags for
tag family %q: got %d; want %d", bi.tagFamilies[i].name,
len(bi.tagFamilies[i].tags), len(b.tagFamilies[i].tags))
+ } else {
+ existingTagFamily := tagFamilyMap[tf.name]
+ columnMap := make(map[string]*tag)
+ for i := range existingTagFamily.tags {
+ columnMap[existingTagFamily.tags[i].name] =
&existingTagFamily.tags[i]
}
- for j := range bi.tagFamilies[i].tags {
- if bi.tagFamilies[i].tags[j].name !=
b.tagFamilies[i].tags[j].name {
- logger.Panicf("unexpected tag name for
tag family %q: got %q; want %q", bi.tagFamilies[i].name,
bi.tagFamilies[i].tags[j].name, b.tagFamilies[i].tags[j].name)
+ for i := range tf.tags {
+ if _, exists := columnMap[tf.tags[i].name];
!exists {
+ for j := 0; j < emptySize; j++ {
+ tf.tags[i].values =
append(tf.tags[i].values, nil)
+ }
}
-
assertIdxAndOffset(b.tagFamilies[i].tags[j].name,
len(b.tagFamilies[i].tags[j].values), b.idx, offset)
- bi.tagFamilies[i].tags[j].values =
append(bi.tagFamilies[i].tags[j].values,
b.tagFamilies[i].tags[j].values[b.idx:offset]...)
}
}
}
-
- assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
- bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
- bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...)
}
func assertIdxAndOffset(name string, length int, idx int, offset int) {
diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go
index ed57f48d..3632d638 100644
--- a/banyand/stream/block_test.go
+++ b/banyand/stream/block_test.go
@@ -609,6 +609,220 @@ func Test_blockPointer_append(t *testing.T) {
idx: 0,
},
},
+ {
+ name: "Test append with missing tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{},
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag family",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{},
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with missing tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{},
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+
nil, nil,
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
+ {
+ name: "Test append with additional tag",
+ fields: fields{
+ timestamps: []int64{1, 2},
+ elementIDs: []uint64{0, 1},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{},
+ },
+ },
+ },
+ args: args{
+ b: &blockPointer{
+ block: block{
+ timestamps: []int64{4, 5},
+ elementIDs: []uint64{2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ offset: 2,
+ },
+ want: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2, 4, 5},
+ elementIDs: []uint64{0, 1, 2, 3},
+ tagFamilies: []tagFamily{
+ {
+ name: "arrTag",
+ tags: []tag{
+ {
+ name:
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+ values:
[][]byte{
+
nil, nil,
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ },
+ idx: 0,
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index ee335c3c..d8e14b28 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -42,6 +42,8 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query/model"
)
+const checkDoneEvery = 128
+
func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr
model.StreamQueryResult, err error) {
if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and
series are required")
@@ -198,12 +200,16 @@ func (qr *queryResult) scanParts(ctx context.Context, qo
queryOptions) error {
if ti.Error() != nil {
return fmt.Errorf("cannot init tstIter: %w", ti.Error())
}
+ var hit int
for ti.nextBlock() {
- select {
- case <-ctx.Done():
- return errors.WithMessagef(ctx.Err(), "interrupt:
scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap),
len(ti.piPool))
- default:
+ if hit%checkDoneEvery == 0 {
+ select {
+ case <-ctx.Done():
+ return errors.WithMessagef(ctx.Err(),
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data),
len(ti.piHeap), len(ti.piPool))
+ default:
+ }
}
+ hit++
bc := generateBlockCursor()
p := ti.piHeap[0]
bc.init(p.p, p.curBlock, qo)
@@ -229,6 +235,12 @@ func (qr *queryResult) load(ctx context.Context, qo
queryOptions) *model.StreamR
cursorChan := make(chan int, len(qr.data))
for i := 0; i < len(qr.data); i++ {
go func(i int) {
+ select {
+ case <-ctx.Done():
+ cursorChan <- i
+ return
+ default:
+ }
tmpBlock := generateBlock()
defer releaseBlock(tmpBlock)
if !qr.data[i].loadData(tmpBlock) {
@@ -284,16 +296,17 @@ func (qr *queryResult) load(ctx context.Context, qo
queryOptions) *model.StreamR
blankCursorList := []int{}
for completed := 0; completed < len(qr.data); completed++ {
- select {
- case <-ctx.Done():
- return &model.StreamResult{
- Error: errors.WithMessagef(ctx.Err(),
"interrupt: loaded %d/%d cursors", completed, len(qr.data)),
- }
- case result := <-cursorChan:
- if result != -1 {
- blankCursorList =
append(blankCursorList, result)
- }
+ result := <-cursorChan
+ if result != -1 {
+ blankCursorList = append(blankCursorList,
result)
+ }
+ }
+ select {
+ case <-ctx.Done():
+ return &model.StreamResult{
+ Error: errors.WithMessagef(ctx.Err(),
"interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)),
}
+ default:
}
sort.Slice(blankCursorList, func(i, j int) bool {
return blankCursorList[i] > blankCursorList[j]
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 1349eeca..610a6781 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -219,7 +219,7 @@ func (w *writeCallback) handle(dst
map[string]*elementsInGroup, writeEvent *stre
return dst, nil
}
-func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp
bus.Message) {
+func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp
bus.Message) {
events, ok := message.Data().([]any)
if !ok {
w.l.Warn().Msg("invalid event data type")
@@ -232,12 +232,6 @@ func (w *writeCallback) Rev(ctx context.Context, message
bus.Message) (resp bus.
groups := make(map[string]*elementsInGroup)
var builder strings.Builder
for i := range events {
- select {
- case <-ctx.Done():
- w.l.Warn().Msgf("context is done, handled %d events", i)
- break
- default:
- }
var writeEvent *streamv1.InternalWriteRequest
switch e := events[i].(type) {
case *streamv1.InternalWriteRequest:
diff --git a/docs/concept/tsdb.md b/docs/concept/tsdb.md
index 2aa9cbd5..02477323 100644
--- a/docs/concept/tsdb.md
+++ b/docs/concept/tsdb.md
@@ -1,10 +1,10 @@
-# TimeSeries Database(TSDB)
+# TimeSeries Database(TSDB) v1.1.0
TSDB is a time-series storage engine designed to store and query large volumes
of time-series data. One of the key features of TSDB is its ability to
automatically manage data storage over time, optimize performance and ensure
that the system can scale to handle large workloads. TSDB empowers `Measure`
and `Stream` relevant data.
In TSDB, the data in a group is partitioned base on the time range of the
data. The segment size is determined by the `segment_interval` of a group. The
number of segments in a group is determined by the `ttl` of a group. A new
segment is created when the written data exceeds the time range of the current
segment. The expired segment will be deleted after the `ttl` of the group.
-
+
## Segment
@@ -16,10 +16,29 @@ In each segment, the data is spread into shards based on
`entity`. The series in
Each shard is assigned to a specific set of storage nodes, and those nodes
store and process the data within that shard. This allows BanyanDB to scale
horizontally by adding more storage nodes to the cluster as needed.
-Each shard is composed of multiple [parts](#Part). Whenever SkyWalking sends a
batch of data, BanyanDB writes this batch of data into a new part. For data of
the `Stream` type, the inverted indexes generated based on the indexing rules
are also stored in the segment. Since BanyanDB adopts a snapshot approach for
data read and write operations, the segment also needs to maintain additional
snapshot information to record the validity of the parts.
+Each shard is composed of multiple [parts](#Part). Whenever SkyWalking sends a
batch of data, BanyanDB writes this batch of data into a new part. For data of
the `Stream` type, the inverted indexes generated based on the indexing rules
are also stored in the segment.
+
+Since BanyanDB adopts a snapshot approach for data read and write operations,
the segment also needs to maintain additional snapshot information to record
the validity of the parts. The shard contains `xxxxxxx.snp` to record the
validity of parts. In the chart, `0000000000000001` is removed from the
snapshot file, which means the part is invalid. It will be cleaned up in the
next flush or merge operation.

+## Inverted Index
+
+The inverted index is used to locate the data in the shard. For `measure`, it
is a mapping from the term to the series id. For `stream`, it is a mapping from
the term to the timestamp.
+
+The inverted index stores `snapshot` file `xxxxxxx.snp` to record the validity
of segments. In the chart, `0000000000000001.seg` is removed from the snapshot
file, which means the segment is invalid. It will be cleaned up in the next
flush or merge operation.
+
+The segment file `xxxxxxxx.seg` contains the inverted index data. It includes
four parts:
+
+- **Tags**: The mapping from the tag name to the dictionary location.
+- **Dictionary**: It's a FST(Finite State Transducer) dictionary to map tag
value to the posting list.
+- **Posting List**: The mapping from the tag value to the series id or
timestamp. It also contains a location info to the stored tag value.
+- **Stored Tag Value**: The stored tag value. If you set tag spec
`indexed_only=true`, the tag value will not be stored here.
+
+
+
+If you want to search `Tag1=Value1`, the index will first search the `Tags`
part to find the dictionary location of `Tag1`. Then, it will search the
`Dictionary` part to find the posting list location of `Value1`. Finally, it
will search the `Posting List` part to find the series id or timestamp. If you
want to fetch the tag value, it will search the `Stored Tag Value` part to find
the tag value.
+
## Part
Within a part, data is split into multiple files in a columnar manner. The
timestamps are stored in the `timestamps.bin` file, tags are organized in
persistent tag families as various files with the `.tf` suffix, and fields are
stored separately in the `fields.bin` file.
diff --git a/docs/menu.yml b/docs/menu.yml
index bc31ebd2..19ec1733 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -126,7 +126,7 @@ catalog:
- name: "File Format"
catalog:
- name: "v1.1.0"
- path: ""
+ path: "/concept/tsdb"
- name: "Concepts"
catalog:
- name: "Clustering"
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4987ddb5..5d35d6aa 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -380,7 +380,7 @@ func (bmi *blugeMatchIterator) Next() bool {
bmi.err = io.EOF
return false
}
- bmi.hit++
+ bmi.hit = match.HitNumber
for i := range bmi.current.Values {
bmi.current.Values[i] = nil
}
diff --git a/pkg/index/inverted/inverted_series.go
b/pkg/index/inverted/inverted_series.go
index f9b700f4..7288dd31 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -113,12 +113,17 @@ func (s *store) Search(ctx context.Context,
func parseResult(dmi search.DocumentMatchIterator, loadedFields
[]index.FieldKey) ([]index.SeriesDocument, error) {
result := make([]index.SeriesDocument, 0, 10)
next, err := dmi.Next()
+ if err != nil {
+ return nil, errors.WithMessage(err, "iterate document match
iterator")
+ }
docIDMap := make(map[uint64]struct{})
fields := make([]string, 0, len(loadedFields))
for i := range loadedFields {
fields = append(fields, loadedFields[i].Marshal())
}
+ var hitNumber int
for err == nil && next != nil {
+ hitNumber = next.HitNumber
var doc index.SeriesDocument
if len(loadedFields) > 0 {
doc.Fields = make(map[string][]byte)
@@ -144,7 +149,7 @@ func parseResult(dmi search.DocumentMatchIterator,
loadedFields []index.FieldKey
return true
})
if err != nil {
- return nil, errors.WithMessagef(err, "visit stored
fields, hit: %d", len(result))
+ return nil, errors.WithMessagef(err, "visit stored
fields, hit: %d", hitNumber)
}
if doc.Key.ID > 0 {
result = append(result, doc)
@@ -152,7 +157,7 @@ func parseResult(dmi search.DocumentMatchIterator,
loadedFields []index.FieldKey
next, err = dmi.Next()
}
if err != nil {
- return nil, errors.WithMessagef(err, "iterate document match
iterator, hit: %d", len(result))
+ return nil, errors.WithMessagef(err, "iterate document match
iterator, hit: %d", hitNumber)
}
return result, nil
}
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 49723340..f7db768a 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -73,6 +73,11 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
}
func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element,
error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
if i.result != nil {
return BuildElementsFromStreamResult(ctx, i.result), nil
}
diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile
index a8b068c4..4ce236a5 100644
--- a/test/docker/Dockerfile
+++ b/test/docker/Dockerfile
@@ -26,6 +26,8 @@ COPY
banyand/build/bin/linux/${TARGETARCH}/banyand-server-static /banyand
COPY bydbctl/build/bin/linux/${TARGETARCH}/bydbctl-cli-static /bydbctl
COPY --from=certs /etc/ssl/certs /etc/ssl/certs
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=INFO
+ENV GRPC_GO_LOG_FORMATTER=json
EXPOSE 17912
EXPOSE 17913