This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new d14d599e Fix bugs and add docs (#548)
d14d599e is described below

commit d14d599ecaa58a36c18ee3bd01c2d8567206f28b
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Oct 6 20:39:41 2024 +0800

    Fix bugs and add docs (#548)
    
    
    
    * Add doc
    
    * Append different tag and field structure
    
    * Fix the bug that accesses restored data
    
    ---------
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   1 +
 banyand/Dockerfile                                 |   3 +
 banyand/measure/block.go                           | 216 ++++++++++---
 banyand/measure/block_test.go                      | 356 +++++++++++++++++++++
 banyand/measure/query.go                           |  48 ++-
 banyand/measure/write.go                           |   8 +-
 banyand/metadata/schema/etcd.go                    |   6 +-
 banyand/queue/sub/sub.go                           |   9 +-
 banyand/stream/block.go                            | 137 ++++++--
 banyand/stream/block_test.go                       | 214 +++++++++++++
 banyand/stream/query.go                            |  39 ++-
 banyand/stream/write.go                            |   8 +-
 docs/concept/tsdb.md                               |  25 +-
 docs/menu.yml                                      |   2 +-
 pkg/index/inverted/inverted.go                     |   2 +-
 pkg/index/inverted/inverted_series.go              |   9 +-
 .../logical/stream/stream_plan_indexscan_local.go  |   5 +
 test/docker/Dockerfile                             |   2 +
 18 files changed, 976 insertions(+), 114 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 652d735f..bb78d51d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -47,6 +47,7 @@ Release Notes.
 - Fix panic when reading a disorder block of measure. This block's versions 
are not sorted in descending order.
 - Fix the bug that the etcd client doesn't reconnect when facing the context 
timeout in the startup phase.
 - Fix the bug that the long running query doesn't stop when the context is 
canceled.
+- Fix the bug that merge block with different tags or fields.
 
 ### Documentation
 
diff --git a/banyand/Dockerfile b/banyand/Dockerfile
index 42db44e5..5f21348f 100644
--- a/banyand/Dockerfile
+++ b/banyand/Dockerfile
@@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static 
"/banyand"
 
 FROM build-${TARGETOS} AS final
 
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR
+ENV GRPC_GO_LOG_FORMATTER=json
+
 EXPOSE 17912
 EXPOSE 17913
 EXPOSE 6060
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index d6808bd3..282c0b22 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "fmt"
        "slices"
        "sort"
 
@@ -787,63 +788,206 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
        bi.append(b, len(b.timestamps))
 }
 
+var log = logger.GetLogger("measure").Named("block")
+
 func (bi *blockPointer) append(b *blockPointer, offset int) {
        if offset <= b.idx {
                return
        }
        if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
-               for _, tf := range b.tagFamilies {
-                       tagFamily := columnFamily{name: tf.name}
-                       for _, c := range tf.columns {
-                               col := column{name: c.name, valueType: 
c.valueType}
-                               assertIdxAndOffset(col.name, len(c.values), 
b.idx, offset)
-                               col.values = append(col.values, 
c.values[b.idx:offset]...)
-                               tagFamily.columns = append(tagFamily.columns, 
col)
+               fullTagAppend(bi, b, offset)
+       } else {
+               if err := fastTagAppend(bi, b, offset); err != nil {
+                       if log.Debug().Enabled() {
+                               log.Debug().Msgf("fastTagMerge failed: %v; 
falling back to fullTagMerge", err)
                        }
-                       bi.tagFamilies = append(bi.tagFamilies, tagFamily)
+                       fullTagAppend(bi, b, offset)
                }
+       }
+
+       if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
+               fullFieldAppend(bi, b, offset)
        } else {
-               if len(bi.tagFamilies) != len(b.tagFamilies) {
-                       logger.Panicf("unexpected number of tag families: got 
%d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
+               if err := fastFieldAppend(bi, b, offset); err != nil {
+                       if log.Debug().Enabled() {
+                               log.Debug().Msgf("fastFieldAppend failed: %v; 
falling back to fullFieldAppend", err)
+                       }
+                       fullFieldAppend(bi, b, offset)
+               }
+       }
+
+       assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
+       bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
+       assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
+       bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
+}
+
+func fastTagAppend(bi, b *blockPointer, offset int) error {
+       if len(bi.tagFamilies) != len(b.tagFamilies) {
+               return fmt.Errorf("unexpected number of tag families: got %d; 
want %d", len(b.tagFamilies), len(bi.tagFamilies))
+       }
+       for i := range bi.tagFamilies {
+               if bi.tagFamilies[i].name != b.tagFamilies[i].name {
+                       return fmt.Errorf("unexpected tag family name: got %q; 
want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
                }
-               for i := range bi.tagFamilies {
-                       if bi.tagFamilies[i].name != b.tagFamilies[i].name {
-                               logger.Panicf("unexpected tag family name: got 
%q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
+               if len(bi.tagFamilies[i].columns) != 
len(b.tagFamilies[i].columns) {
+                       return fmt.Errorf("unexpected number of tags for tag 
family %q: got %d; want %d",
+                               bi.tagFamilies[i].name, 
len(b.tagFamilies[i].columns), len(bi.tagFamilies[i].columns))
+               }
+               for j := range bi.tagFamilies[i].columns {
+                       if bi.tagFamilies[i].columns[j].name != 
b.tagFamilies[i].columns[j].name {
+                               return fmt.Errorf("unexpected tag name for tag 
family %q: got %q; want %q",
+                                       bi.tagFamilies[i].name, 
b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name)
                        }
-                       if len(bi.tagFamilies[i].columns) != 
len(b.tagFamilies[i].columns) {
-                               logger.Panicf("unexpected number of tags for 
tag family %q: got %d; want %d", bi.tagFamilies[i].name, 
len(bi.tagFamilies[i].columns), len(b.tagFamilies[i].columns))
+                       assertIdxAndOffset(b.tagFamilies[i].columns[j].name, 
len(b.tagFamilies[i].columns[j].values), b.idx, offset)
+                       bi.tagFamilies[i].columns[j].values = 
append(bi.tagFamilies[i].columns[j].values, 
b.tagFamilies[i].columns[j].values[b.idx:offset]...)
+               }
+       }
+       return nil
+}
+
+func fullTagAppend(bi, b *blockPointer, offset int) {
+       existDataSize := len(bi.timestamps)
+       appendTagFamilies := func(tf columnFamily) {
+               tagFamily := columnFamily{name: tf.name}
+               for i := range tf.columns {
+                       assertIdxAndOffset(tf.columns[i].name, 
len(tf.columns[i].values), b.idx, offset)
+                       col := column{name: tf.columns[i].name, valueType: 
tf.columns[i].valueType}
+                       for j := 0; j < existDataSize; j++ {
+                               col.values = append(col.values, nil)
+                       }
+                       col.values = append(col.values, 
tf.columns[i].values[b.idx:offset]...)
+                       tagFamily.columns = append(tagFamily.columns, col)
+               }
+               bi.tagFamilies = append(bi.tagFamilies, tagFamily)
+       }
+       if len(bi.tagFamilies) == 0 {
+               for _, tf := range b.tagFamilies {
+                       appendTagFamilies(tf)
+               }
+               return
+       }
+
+       tagFamilyMap := make(map[string]*columnFamily)
+       for i := range bi.tagFamilies {
+               tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
+       }
+
+       for _, tf := range b.tagFamilies {
+               if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
+                       columnMap := make(map[string]*column)
+                       for i := range existingTagFamily.columns {
+                               columnMap[existingTagFamily.columns[i].name] = 
&existingTagFamily.columns[i]
                        }
-                       for j := range bi.tagFamilies[i].columns {
-                               if bi.tagFamilies[i].columns[j].name != 
b.tagFamilies[i].columns[j].name {
-                                       logger.Panicf("unexpected tag name for 
tag family %q: got %q; want %q", bi.tagFamilies[i].name, 
bi.tagFamilies[i].columns[j].name, b.tagFamilies[i].columns[j].name)
+
+                       for _, c := range tf.columns {
+                               if existingColumn, exists := columnMap[c.name]; 
exists {
+                                       assertIdxAndOffset(c.name, 
len(c.values), b.idx, offset)
+                                       existingColumn.values = 
append(existingColumn.values, c.values[b.idx:offset]...)
+                               } else {
+                                       assertIdxAndOffset(c.name, 
len(c.values), b.idx, offset)
+                                       col := column{name: c.name, valueType: 
c.valueType}
+                                       for j := 0; j < existDataSize; j++ {
+                                               col.values = append(col.values, 
nil)
+                                       }
+                                       col.values = append(col.values, 
c.values[b.idx:offset]...)
+                                       existingTagFamily.columns = 
append(existingTagFamily.columns, col)
                                }
-                               
assertIdxAndOffset(b.tagFamilies[i].columns[j].name, 
len(b.tagFamilies[i].columns[j].values), b.idx, offset)
-                               bi.tagFamilies[i].columns[j].values = 
append(bi.tagFamilies[i].columns[j].values, 
b.tagFamilies[i].columns[j].values[b.idx:offset]...)
                        }
+               } else {
+                       appendTagFamilies(tf)
                }
        }
+       for k := range tagFamilyMap {
+               delete(tagFamilyMap, k)
+       }
+       for i := range b.tagFamilies {
+               tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
+       }
+       emptySize := offset - b.idx
+       for _, tf := range bi.tagFamilies {
+               if _, exists := tagFamilyMap[tf.name]; !exists {
+                       for i := range tf.columns {
+                               for j := 0; j < emptySize; j++ {
+                                       tf.columns[i].values = 
append(tf.columns[i].values, nil)
+                               }
+                       }
+               } else {
+                       existingTagFamily := tagFamilyMap[tf.name]
+                       columnMap := make(map[string]*column)
+                       for i := range existingTagFamily.columns {
+                               columnMap[existingTagFamily.columns[i].name] = 
&existingTagFamily.columns[i]
+                       }
+                       for i := range tf.columns {
+                               if _, exists := columnMap[tf.columns[i].name]; 
!exists {
+                                       for j := 0; j < emptySize; j++ {
+                                               tf.columns[i].values = 
append(tf.columns[i].values, nil)
+                                       }
+                               }
+                       }
+               }
+       }
+}
 
-       if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
-               for _, c := range b.field.columns {
-                       col := column{name: c.name, valueType: c.valueType}
-                       assertIdxAndOffset(col.name, len(c.values), b.idx, 
offset)
-                       col.values = append(col.values, 
c.values[b.idx:offset]...)
-                       bi.field.columns = append(bi.field.columns, col)
+func fastFieldAppend(bi, b *blockPointer, offset int) error {
+       if len(bi.field.columns) != len(b.field.columns) {
+               return fmt.Errorf("unexpected number of fields: got %d; want 
%d", len(bi.field.columns), len(b.field.columns))
+       }
+       for i := range bi.field.columns {
+               if bi.field.columns[i].name != b.field.columns[i].name {
+                       return fmt.Errorf("unexpected field name: got %q; want 
%q", b.field.columns[i].name, bi.field.columns[i].name)
                }
-       } else {
-               if len(bi.field.columns) != len(b.field.columns) {
-                       logger.Panicf("unexpected number of fields: got %d; 
want %d", len(bi.field.columns), len(b.field.columns))
+               assertIdxAndOffset(b.field.columns[i].name, 
len(b.field.columns[i].values), b.idx, offset)
+               bi.field.columns[i].values = append(bi.field.columns[i].values, 
b.field.columns[i].values[b.idx:offset]...)
+       }
+       return nil
+}
+
+func fullFieldAppend(bi, b *blockPointer, offset int) {
+       existDataSize := len(bi.timestamps)
+       appendFields := func(c column) {
+               col := column{name: c.name, valueType: c.valueType}
+               for j := 0; j < existDataSize; j++ {
+                       col.values = append(col.values, nil)
                }
-               for i := range bi.field.columns {
-                       assertIdxAndOffset(b.field.columns[i].name, 
len(b.field.columns[i].values), b.idx, offset)
-                       bi.field.columns[i].values = 
append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...)
+               col.values = append(col.values, c.values[b.idx:offset]...)
+               bi.field.columns = append(bi.field.columns, col)
+       }
+       if len(bi.field.columns) == 0 {
+               for _, c := range b.field.columns {
+                       appendFields(c)
                }
+               return
        }
 
-       assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
-       bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
-       assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
-       bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
+       fieldMap := make(map[string]*column)
+       for i := range bi.field.columns {
+               fieldMap[bi.field.columns[i].name] = &bi.field.columns[i]
+       }
+
+       for _, c := range b.field.columns {
+               if existingField, exists := fieldMap[c.name]; exists {
+                       assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
+                       existingField.values = append(existingField.values, 
c.values[b.idx:offset]...)
+               } else {
+                       appendFields(c)
+               }
+       }
+       for k := range fieldMap {
+               delete(fieldMap, k)
+       }
+       for i := range b.field.columns {
+               fieldMap[b.field.columns[i].name] = &b.field.columns[i]
+       }
+
+       emptySize := offset - b.idx
+       for i := range bi.field.columns {
+               if _, exists := fieldMap[bi.field.columns[i].name]; !exists {
+                       for j := 0; j < emptySize; j++ {
+                               bi.field.columns[i].values = 
append(bi.field.columns[i].values, nil)
+                       }
+               }
+       }
 }
 
 func assertIdxAndOffset(name string, length int, idx int, offset int) {
diff --git a/banyand/measure/block_test.go b/banyand/measure/block_test.go
index 5d41f236..cabaea10 100644
--- a/banyand/measure/block_test.go
+++ b/banyand/measure/block_test.go
@@ -716,6 +716,362 @@ func Test_blockPointer_append(t *testing.T) {
                                idx: 0,
                        },
                },
+               {
+                       name: "Test append with missing tag family",
+                       fields: fields{
+                               timestamps: []int64{1, 2},
+                               versions:   []int64{1, 1},
+                               tagFamilies: []columnFamily{
+                                       {
+                                               name: "arrTag",
+                                               columns: []column{
+                                                       {
+                                                               name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                               values: 
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                       },
+                                               },
+                                       },
+                               },
+                               field: columnFamily{
+                                       columns: []column{
+                                               {name: "strField", valueType: 
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+                                       },
+                               },
+                               partID: 1,
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps:  []int64{4, 5},
+                                               versions:    []int64{1, 1},
+                                               tagFamilies: []columnFamily{},
+                                               field: columnFamily{
+                                                       columns: []column{
+                                                               {name: 
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), 
[]byte("field6")}},
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       versions:   []int64{1, 1, 1, 1},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                               
nil, nil,
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "strField", 
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), 
[]byte("field4"), []byte("field5"), []byte("field6")}},
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with additional tag family",
+                       fields: fields{
+                               timestamps:  []int64{1, 2},
+                               versions:    []int64{1, 1},
+                               tagFamilies: []columnFamily{},
+                               field: columnFamily{
+                                       columns: []column{
+                                               {name: "strField", valueType: 
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+                                       },
+                               },
+                               partID: 1,
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps: []int64{4, 5},
+                                               versions:   []int64{1, 1},
+                                               tagFamilies: []columnFamily{
+                                                       {
+                                                               name: "arrTag",
+                                                               columns: 
[]column{
+                                                                       {
+                                                                               
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                               
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                               field: columnFamily{
+                                                       columns: []column{
+                                                               {name: 
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), 
[]byte("field6")}},
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       versions:   []int64{1, 1, 1, 1},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
nil, nil,
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "strField", 
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), 
[]byte("field4"), []byte("field5"), []byte("field6")}},
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with missing tag",
+                       fields: fields{
+                               timestamps: []int64{1, 2},
+                               versions:   []int64{1, 1},
+                               tagFamilies: []columnFamily{
+                                       {
+                                               name: "arrTag",
+                                               columns: []column{
+                                                       {
+                                                               name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                               values: 
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                       },
+                                               },
+                                       },
+                               },
+                               field: columnFamily{
+                                       columns: []column{
+                                               {name: "strField", valueType: 
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+                                       },
+                               },
+                               partID: 1,
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps: []int64{4, 5},
+                                               versions:   []int64{1, 1},
+                                               tagFamilies: []columnFamily{
+                                                       {
+                                                               name:    
"arrTag",
+                                                               columns: 
[]column{},
+                                                       },
+                                               },
+                                               field: columnFamily{
+                                                       columns: []column{
+                                                               {name: 
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), 
[]byte("field6")}},
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       versions:   []int64{1, 1, 1, 1},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                               
nil, nil,
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "strField", 
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), 
[]byte("field4"), []byte("field5"), []byte("field6")}},
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with additional tag",
+                       fields: fields{
+                               timestamps: []int64{1, 2},
+                               versions:   []int64{1, 1},
+                               tagFamilies: []columnFamily{
+                                       {
+                                               name:    "arrTag",
+                                               columns: []column{},
+                                       },
+                               },
+                               field: columnFamily{
+                                       columns: []column{
+                                               {name: "strField", valueType: 
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+                                       },
+                               },
+                               partID: 1,
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps: []int64{4, 5},
+                                               versions:   []int64{1, 1},
+                                               tagFamilies: []columnFamily{
+                                                       {
+                                                               name: "arrTag",
+                                                               columns: 
[]column{
+                                                                       {
+                                                                               
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                               
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                               field: columnFamily{
+                                                       columns: []column{
+                                                               {name: 
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), 
[]byte("field6")}},
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       versions:   []int64{1, 1, 1, 1},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
nil, nil,
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "strField", 
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), 
[]byte("field4"), []byte("field5"), []byte("field6")}},
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with missing field",
+                       fields: fields{
+                               timestamps:  []int64{1, 2},
+                               versions:    []int64{1, 1},
+                               tagFamilies: []columnFamily{},
+                               field: columnFamily{
+                                       columns: []column{
+                                               {name: "strField", valueType: 
pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}},
+                                       },
+                               },
+                               partID: 1,
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps:  []int64{4, 5},
+                                               versions:    []int64{1, 1},
+                                               tagFamilies: []columnFamily{},
+                                               field:       columnFamily{},
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps:  []int64{1, 2, 4, 5},
+                                       versions:    []int64{1, 1, 1, 1},
+                                       tagFamilies: []columnFamily{},
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "strField", 
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), 
[]byte("field4"), nil, nil}},
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with additional field",
+                       fields: fields{
+                               timestamps:  []int64{1, 2},
+                               versions:    []int64{1, 1},
+                               tagFamilies: []columnFamily{},
+                               field:       columnFamily{},
+                               partID:      1,
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps:  []int64{4, 5},
+                                               versions:    []int64{1, 1},
+                                               tagFamilies: []columnFamily{},
+                                               field: columnFamily{
+                                                       columns: []column{
+                                                               {name: 
"strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), 
[]byte("field6")}},
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps:  []int64{1, 2, 4, 5},
+                                       versions:    []int64{1, 1, 1, 1},
+                                       tagFamilies: []columnFamily{},
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "strField", 
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, nil, []byte("field5"), 
[]byte("field6")}},
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index ac1ccfa0..299402fe 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -41,7 +41,8 @@ import (
 )
 
 const (
-       preloadSize = 100
+       preloadSize    = 100
+       checkDoneEvery = 128
 )
 
 // Query allow to retrieve measure data points.
@@ -252,12 +253,16 @@ func (s *measure) searchBlocks(ctx context.Context, 
result *queryResult, sids []
        if tstIter.Error() != nil {
                return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
        }
+       var hit int
        for tstIter.nextBlock() {
-               select {
-               case <-ctx.Done():
-                       return errors.WithMessagef(ctx.Err(), "interrupt: 
scanned %d blocks, remained %d/%d parts to scan", len(result.data), 
len(tstIter.piHeap), len(tstIter.piPool))
-               default:
+               if hit%checkDoneEvery == 0 {
+                       select {
+                       case <-ctx.Done():
+                               return errors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), 
len(tstIter.piHeap), len(tstIter.piPool))
+                       default:
+                       }
                }
+               hit++
                bc := generateBlockCursor()
                p := tstIter.piHeap[0]
                bc.init(p.p, p.curBlock, qo)
@@ -432,12 +437,20 @@ type queryResult struct {
        data             []*blockCursor
        snapshots        []*snapshot
        segments         []storage.Segment[*tsTable, option]
+       hit              int
        loaded           bool
        orderByTS        bool
        ascTS            bool
 }
 
 func (qr *queryResult) Pull() *model.MeasureResult {
+       select {
+       case <-qr.ctx.Done():
+               return &model.MeasureResult{
+                       Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: 
hit %d", qr.hit),
+               }
+       default:
+       }
        if !qr.loaded {
                if len(qr.data) == 0 {
                        return nil
@@ -446,6 +459,12 @@ func (qr *queryResult) Pull() *model.MeasureResult {
                cursorChan := make(chan int, len(qr.data))
                for i := 0; i < len(qr.data); i++ {
                        go func(i int) {
+                               select {
+                               case <-qr.ctx.Done():
+                                       cursorChan <- i
+                                       return
+                               default:
+                               }
                                tmpBlock := generateBlock()
                                defer releaseBlock(tmpBlock)
                                if !qr.data[i].loadData(tmpBlock) {
@@ -461,17 +480,18 @@ func (qr *queryResult) Pull() *model.MeasureResult {
 
                blankCursorList := []int{}
                for completed := 0; completed < len(qr.data); completed++ {
-                       select {
-                       case <-qr.ctx.Done():
-                               return &model.MeasureResult{
-                                       Error: 
errors.WithMessagef(qr.ctx.Err(), "interrupt: loaded %d/%d cursors", completed, 
len(qr.data)),
-                               }
-                       case result := <-cursorChan:
-                               if result != -1 {
-                                       blankCursorList = 
append(blankCursorList, result)
-                               }
+                       result := <-cursorChan
+                       if result != -1 {
+                               blankCursorList = append(blankCursorList, 
result)
                        }
                }
+               select {
+               case <-qr.ctx.Done():
+                       return &model.MeasureResult{
+                               Error: errors.WithMessagef(qr.ctx.Err(), 
"interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)),
+                       }
+               default:
+               }
                sort.Slice(blankCursorList, func(i, j int) bool {
                        return blankCursorList[i] > blankCursorList[j]
                })
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index da5ba404..64362ab2 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -238,7 +238,7 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, 
option], dpg *dataPoi
        return dpt, nil
 }
 
-func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp 
bus.Message) {
+func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp 
bus.Message) {
        events, ok := message.Data().([]any)
        if !ok {
                w.l.Warn().Msg("invalid event data type")
@@ -250,12 +250,6 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
        }
        groups := make(map[string]*dataPointsInGroup)
        for i := range events {
-               select {
-               case <-ctx.Done():
-                       w.l.Warn().Msgf("context is done, handled %d events", i)
-                       break
-               default:
-               }
                var writeEvent *measurev1.InternalWriteRequest
                switch e := events[i].(type) {
                case *measurev1.InternalWriteRequest:
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index b9a7e601..0e112039 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -521,10 +521,8 @@ func (e *etcdSchemaRegistry) revokeLease(lease 
*clientv3.LeaseGrantResponse) {
        ctx, cancel := context.WithTimeout(context.Background(), leaseDuration)
        defer cancel()
        _, err := e.client.Lease.Revoke(ctx, lease.ID)
-       if err != nil {
-               if !errors.Is(err, context.DeadlineExceeded) {
-                       e.l.Error().Err(err).Msgf("failed to revoke lease %d", 
lease.ID)
-               }
+       if err != nil && e.l.Debug().Enabled() {
+               e.l.Debug().Err(err).Msgf("failed to revoke lease %d", lease.ID)
        }
 }
 
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index 0b3f2ae9..46db4174 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -39,12 +39,11 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
        reply := func(writeEntity *clusterv1.SendRequest, err error, message 
string) {
                s.log.Error().Stringer("request", 
writeEntity).Err(err).Msg(message)
                s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
-               s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                if errResp := stream.Send(&clusterv1.SendResponse{
                        MessageId: writeEntity.MessageId,
                        Error:     message,
                }); errResp != nil {
-                       s.log.Err(errResp).AnErr("original", 
err).Stringer("request", writeEntity).Msg("failed to send error response")
+                       s.log.Error().Err(errResp).AnErr("original", 
err).Stringer("request", writeEntity).Msg("failed to send error response")
                        s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                }
        }
@@ -153,6 +152,12 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                case proto.Message:
                        message = d
                case common.Error:
+                       select {
+                       case <-ctx.Done():
+                               s.metrics.totalMsgReceivedErr.Inc(1, 
writeEntity.Topic)
+                               return ctx.Err()
+                       default:
+                       }
                        reply(writeEntity, nil, d.Msg())
                        continue
                default:
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 3b65e88e..956c1bf2 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+       "fmt"
        "sort"
 
        "golang.org/x/exp/slices"
@@ -603,45 +604,133 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
        bi.append(b, len(b.timestamps))
 }
 
+var log = logger.GetLogger("stream").Named("block")
+
 func (bi *blockPointer) append(b *blockPointer, offset int) {
        if offset <= b.idx {
                return
        }
        if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
+               fullTagAppend(bi, b, offset)
+       } else {
+               if err := fastTagAppend(bi, b, offset); err != nil {
+                       if log.Debug().Enabled() {
+                               log.Debug().Msgf("fastTagMerge failed: %v; 
falling back to fullTagMerge", err)
+                       }
+                       fullTagAppend(bi, b, offset)
+               }
+       }
+
+       assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
+       bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
+       bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...)
+}
+
+func fastTagAppend(bi, b *blockPointer, offset int) error {
+       if len(bi.tagFamilies) != len(b.tagFamilies) {
+               return fmt.Errorf("unexpected number of tag families: got %d; 
want %d", len(b.tagFamilies), len(bi.tagFamilies))
+       }
+       for i := range bi.tagFamilies {
+               if bi.tagFamilies[i].name != b.tagFamilies[i].name {
+                       return fmt.Errorf("unexpected tag family name: got %q; 
want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
+               }
+               if len(bi.tagFamilies[i].tags) != len(b.tagFamilies[i].tags) {
+                       return fmt.Errorf("unexpected number of tags for tag 
family %q: got %d; want %d",
+                               bi.tagFamilies[i].name, 
len(b.tagFamilies[i].tags), len(bi.tagFamilies[i].tags))
+               }
+               for j := range bi.tagFamilies[i].tags {
+                       if bi.tagFamilies[i].tags[j].name != 
b.tagFamilies[i].tags[j].name {
+                               return fmt.Errorf("unexpected tag name for tag 
family %q: got %q; want %q",
+                                       bi.tagFamilies[i].name, 
b.tagFamilies[i].tags[j].name, bi.tagFamilies[i].tags[j].name)
+                       }
+                       assertIdxAndOffset(b.tagFamilies[i].tags[j].name, 
len(b.tagFamilies[i].tags[j].values), b.idx, offset)
+                       bi.tagFamilies[i].tags[j].values = 
append(bi.tagFamilies[i].tags[j].values, 
b.tagFamilies[i].tags[j].values[b.idx:offset]...)
+               }
+       }
+       return nil
+}
+
+func fullTagAppend(bi, b *blockPointer, offset int) {
+       existDataSize := len(bi.timestamps)
+       appendTagFamilies := func(tf tagFamily) {
+               tfv := tagFamily{name: tf.name}
+               for i := range tf.tags {
+                       assertIdxAndOffset(tf.tags[i].name, 
len(tf.tags[i].values), b.idx, offset)
+                       col := tag{name: tf.tags[i].name, valueType: 
tf.tags[i].valueType}
+                       for j := 0; j < existDataSize; j++ {
+                               col.values = append(col.values, nil)
+                       }
+                       col.values = append(col.values, 
tf.tags[i].values[b.idx:offset]...)
+                       tfv.tags = append(tfv.tags, col)
+               }
+               bi.tagFamilies = append(bi.tagFamilies, tfv)
+       }
+       if len(bi.tagFamilies) == 0 {
                for _, tf := range b.tagFamilies {
-                       tFamily := tagFamily{name: tf.name}
+                       appendTagFamilies(tf)
+               }
+               return
+       }
+
+       tagFamilyMap := make(map[string]*tagFamily)
+       for i := range bi.tagFamilies {
+               tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
+       }
+
+       for _, tf := range b.tagFamilies {
+               if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
+                       columnMap := make(map[string]*tag)
+                       for i := range existingTagFamily.tags {
+                               columnMap[existingTagFamily.tags[i].name] = 
&existingTagFamily.tags[i]
+                       }
+
                        for _, c := range tf.tags {
-                               col := tag{name: c.name, valueType: c.valueType}
-                               assertIdxAndOffset(col.name, len(c.values), 
b.idx, offset)
-                               col.values = append(col.values, 
c.values[b.idx:offset]...)
-                               tFamily.tags = append(tFamily.tags, col)
+                               if existingColumn, exists := columnMap[c.name]; 
exists {
+                                       assertIdxAndOffset(c.name, 
len(c.values), b.idx, offset)
+                                       existingColumn.values = 
append(existingColumn.values, c.values[b.idx:offset]...)
+                               } else {
+                                       assertIdxAndOffset(c.name, 
len(c.values), b.idx, offset)
+                                       col := tag{name: c.name, valueType: 
c.valueType}
+                                       for j := 0; j < existDataSize; j++ {
+                                               col.values = append(col.values, 
nil)
+                                       }
+                                       col.values = append(col.values, 
c.values[b.idx:offset]...)
+                                       existingTagFamily.tags = 
append(existingTagFamily.tags, col)
+                               }
                        }
-                       bi.tagFamilies = append(bi.tagFamilies, tFamily)
+               } else {
+                       appendTagFamilies(tf)
                }
-       } else {
-               if len(bi.tagFamilies) != len(b.tagFamilies) {
-                       logger.Panicf("unexpected number of tag families: got 
%d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
-               }
-               for i := range bi.tagFamilies {
-                       if bi.tagFamilies[i].name != b.tagFamilies[i].name {
-                               logger.Panicf("unexpected tag family name: got 
%q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
+       }
+       for k := range tagFamilyMap {
+               delete(tagFamilyMap, k)
+       }
+       for i := range b.tagFamilies {
+               tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
+       }
+       emptySize := offset - b.idx
+       for _, tf := range bi.tagFamilies {
+               if _, exists := tagFamilyMap[tf.name]; !exists {
+                       for i := range tf.tags {
+                               for j := 0; j < emptySize; j++ {
+                                       tf.tags[i].values = 
append(tf.tags[i].values, nil)
+                               }
                        }
-                       if len(bi.tagFamilies[i].tags) != 
len(b.tagFamilies[i].tags) {
-                               logger.Panicf("unexpected number of tags for 
tag family %q: got %d; want %d", bi.tagFamilies[i].name, 
len(bi.tagFamilies[i].tags), len(b.tagFamilies[i].tags))
+               } else {
+                       existingTagFamily := tagFamilyMap[tf.name]
+                       columnMap := make(map[string]*tag)
+                       for i := range existingTagFamily.tags {
+                               columnMap[existingTagFamily.tags[i].name] = 
&existingTagFamily.tags[i]
                        }
-                       for j := range bi.tagFamilies[i].tags {
-                               if bi.tagFamilies[i].tags[j].name != 
b.tagFamilies[i].tags[j].name {
-                                       logger.Panicf("unexpected tag name for 
tag family %q: got %q; want %q", bi.tagFamilies[i].name, 
bi.tagFamilies[i].tags[j].name, b.tagFamilies[i].tags[j].name)
+                       for i := range tf.tags {
+                               if _, exists := columnMap[tf.tags[i].name]; 
!exists {
+                                       for j := 0; j < emptySize; j++ {
+                                               tf.tags[i].values = 
append(tf.tags[i].values, nil)
+                                       }
                                }
-                               
assertIdxAndOffset(b.tagFamilies[i].tags[j].name, 
len(b.tagFamilies[i].tags[j].values), b.idx, offset)
-                               bi.tagFamilies[i].tags[j].values = 
append(bi.tagFamilies[i].tags[j].values, 
b.tagFamilies[i].tags[j].values[b.idx:offset]...)
                        }
                }
        }
-
-       assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
-       bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
-       bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...)
 }
 
 func assertIdxAndOffset(name string, length int, idx int, offset int) {
diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go
index ed57f48d..3632d638 100644
--- a/banyand/stream/block_test.go
+++ b/banyand/stream/block_test.go
@@ -609,6 +609,220 @@ func Test_blockPointer_append(t *testing.T) {
                                idx: 0,
                        },
                },
+               {
+                       name: "Test append with missing tag family",
+                       fields: fields{
+                               timestamps: []int64{1, 2},
+                               elementIDs: []uint64{0, 1},
+                               tagFamilies: []tagFamily{
+                                       {
+                                               name: "arrTag",
+                                               tags: []tag{
+                                                       {
+                                                               name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                               values: 
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps:  []int64{4, 5},
+                                               elementIDs:  []uint64{2, 3},
+                                               tagFamilies: []tagFamily{},
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       elementIDs: []uint64{0, 1, 2, 3},
+                                       tagFamilies: []tagFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       tags: []tag{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                               
nil, nil,
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with additional tag family",
+                       fields: fields{
+                               timestamps:  []int64{1, 2},
+                               elementIDs:  []uint64{0, 1},
+                               tagFamilies: []tagFamily{},
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps: []int64{4, 5},
+                                               elementIDs: []uint64{2, 3},
+                                               tagFamilies: []tagFamily{
+                                                       {
+                                                               name: "arrTag",
+                                                               tags: []tag{
+                                                                       {
+                                                                               
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                               
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       elementIDs: []uint64{0, 1, 2, 3},
+                                       tagFamilies: []tagFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       tags: []tag{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
nil, nil,
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with missing tag",
+                       fields: fields{
+                               timestamps: []int64{1, 2},
+                               elementIDs: []uint64{0, 1},
+                               tagFamilies: []tagFamily{
+                                       {
+                                               name: "arrTag",
+                                               tags: []tag{
+                                                       {
+                                                               name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                               values: 
[][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps: []int64{4, 5},
+                                               elementIDs: []uint64{2, 3},
+                                               tagFamilies: []tagFamily{
+                                                       {
+                                                               name: "arrTag",
+                                                               tags: []tag{},
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       elementIDs: []uint64{0, 1, 2, 3},
+                                       tagFamilies: []tagFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       tags: []tag{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                               
nil, nil,
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
+               {
+                       name: "Test append with additional tag",
+                       fields: fields{
+                               timestamps: []int64{1, 2},
+                               elementIDs: []uint64{0, 1},
+                               tagFamilies: []tagFamily{
+                                       {
+                                               name: "arrTag",
+                                               tags: []tag{},
+                                       },
+                               },
+                       },
+                       args: args{
+                               b: &blockPointer{
+                                       block: block{
+                                               timestamps: []int64{4, 5},
+                                               elementIDs: []uint64{2, 3},
+                                               tagFamilies: []tagFamily{
+                                                       {
+                                                               name: "arrTag",
+                                                               tags: []tag{
+                                                                       {
+                                                                               
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                               
values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), 
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})},
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       idx: 0,
+                               },
+                               offset: 2,
+                       },
+                       want: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2, 4, 5},
+                                       elementIDs: []uint64{0, 1, 2, 3},
+                                       tagFamilies: []tagFamily{
+                                               {
+                                                       name: "arrTag",
+                                                       tags: []tag{
+                                                               {
+                                                                       name: 
"strArrTag", valueType: pbv1.ValueTypeStrArr,
+                                                                       values: 
[][]byte{
+                                                                               
nil, nil,
+                                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               },
+                               idx: 0,
+                       },
+               },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index ee335c3c..d8e14b28 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -42,6 +42,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
+const checkDoneEvery = 128
+
 func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr 
model.StreamQueryResult, err error) {
        if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
@@ -198,12 +200,16 @@ func (qr *queryResult) scanParts(ctx context.Context, qo 
queryOptions) error {
        if ti.Error() != nil {
                return fmt.Errorf("cannot init tstIter: %w", ti.Error())
        }
+       var hit int
        for ti.nextBlock() {
-               select {
-               case <-ctx.Done():
-                       return errors.WithMessagef(ctx.Err(), "interrupt: 
scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), 
len(ti.piPool))
-               default:
+               if hit%checkDoneEvery == 0 {
+                       select {
+                       case <-ctx.Done():
+                               return errors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), 
len(ti.piHeap), len(ti.piPool))
+                       default:
+                       }
                }
+               hit++
                bc := generateBlockCursor()
                p := ti.piHeap[0]
                bc.init(p.p, p.curBlock, qo)
@@ -229,6 +235,12 @@ func (qr *queryResult) load(ctx context.Context, qo 
queryOptions) *model.StreamR
                cursorChan := make(chan int, len(qr.data))
                for i := 0; i < len(qr.data); i++ {
                        go func(i int) {
+                               select {
+                               case <-ctx.Done():
+                                       cursorChan <- i
+                                       return
+                               default:
+                               }
                                tmpBlock := generateBlock()
                                defer releaseBlock(tmpBlock)
                                if !qr.data[i].loadData(tmpBlock) {
@@ -284,16 +296,17 @@ func (qr *queryResult) load(ctx context.Context, qo 
queryOptions) *model.StreamR
 
                blankCursorList := []int{}
                for completed := 0; completed < len(qr.data); completed++ {
-                       select {
-                       case <-ctx.Done():
-                               return &model.StreamResult{
-                                       Error: errors.WithMessagef(ctx.Err(), 
"interrupt: loaded %d/%d cursors", completed, len(qr.data)),
-                               }
-                       case result := <-cursorChan:
-                               if result != -1 {
-                                       blankCursorList = 
append(blankCursorList, result)
-                               }
+                       result := <-cursorChan
+                       if result != -1 {
+                               blankCursorList = append(blankCursorList, 
result)
+                       }
+               }
+               select {
+               case <-ctx.Done():
+                       return &model.StreamResult{
+                               Error: errors.WithMessagef(ctx.Err(), 
"interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)),
                        }
+               default:
                }
                sort.Slice(blankCursorList, func(i, j int) bool {
                        return blankCursorList[i] > blankCursorList[j]
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 1349eeca..610a6781 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -219,7 +219,7 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        return dst, nil
 }
 
-func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp 
bus.Message) {
+func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp 
bus.Message) {
        events, ok := message.Data().([]any)
        if !ok {
                w.l.Warn().Msg("invalid event data type")
@@ -232,12 +232,6 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
        groups := make(map[string]*elementsInGroup)
        var builder strings.Builder
        for i := range events {
-               select {
-               case <-ctx.Done():
-                       w.l.Warn().Msgf("context is done, handled %d events", i)
-                       break
-               default:
-               }
                var writeEvent *streamv1.InternalWriteRequest
                switch e := events[i].(type) {
                case *streamv1.InternalWriteRequest:
diff --git a/docs/concept/tsdb.md b/docs/concept/tsdb.md
index 2aa9cbd5..02477323 100644
--- a/docs/concept/tsdb.md
+++ b/docs/concept/tsdb.md
@@ -1,10 +1,10 @@
-# TimeSeries Database(TSDB)
+# TimeSeries Database(TSDB) v1.1.0
 
 TSDB is a time-series storage engine designed to store and query large volumes 
of time-series data. One of the key features of TSDB is its ability to 
automatically manage data storage over time, optimize performance and ensure 
that the system can scale to handle large workloads. TSDB empowers `Measure` 
and `Stream` relevant data.
 
 In TSDB, the data in a group is partitioned base on the time range of the 
data. The segment size is determined by the `segment_interval` of a group. The 
number of segments in a group is determined by the `ttl` of a group. A new 
segment is created when the written data exceeds the time range of the current 
segment. The expired segment will be deleted after the `ttl` of the group.
 
-![tsdb](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/tsdb.png)
+![tsdb](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/tsdb-hierarchy.png)
 
 ## Segment
 
@@ -16,10 +16,29 @@ In each segment, the data is spread into shards based on 
`entity`. The series in
 
 Each shard is assigned to a specific set of storage nodes, and those nodes 
store and process the data within that shard. This allows BanyanDB to scale 
horizontally by adding more storage nodes to the cluster as needed.
 
-Each shard is composed of multiple [parts](#Part). Whenever SkyWalking sends a 
batch of data, BanyanDB writes this batch of data into a new part. For data of 
the `Stream` type, the inverted indexes generated based on the indexing rules 
are also stored in the segment. Since BanyanDB adopts a snapshot approach for 
data read and write operations, the segment also needs to maintain additional 
snapshot information to record the validity of the parts.
+Each shard is composed of multiple [parts](#Part). Whenever SkyWalking sends a 
batch of data, BanyanDB writes this batch of data into a new part. For data of 
the `Stream` type, the inverted indexes generated based on the indexing rules 
are also stored in the segment.
+
+Since BanyanDB adopts a snapshot approach for data read and write operations, 
the segment also needs to maintain additional snapshot information to record 
the validity of the parts. The shard contains `xxxxxxx.snp` to record the 
validity of parts. In the chart, `0000000000000001` is removed from the 
snapshot file, which means the part is invalid. It will be cleaned up in the 
next flush or merge operation.
 
 ![shard](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/shard.png)
 
+## Inverted Index
+
+The inverted index is used to locate the data in the shard. For `measure`, it 
is a mapping from the term to the series id. For `stream`, it is a mapping from 
the term to the timestamp.
+
+The inverted index stores `snapshot` file `xxxxxxx.snp` to record the validity 
of segments. In the chart, `0000000000000001.seg` is removed from the snapshot 
file, which means the segment is invalid. It will be cleaned up in the next 
flush or merge operation.
+
+The segment file `xxxxxxxx.seg` contains the inverted index data. It includes 
four parts:
+
+- **Tags**: The mapping from the tag name to the dictionary location.
+- **Dictionary**: It's a FST(Finite State Transducer) dictionary to map tag 
value to the posting list.
+- **Posting List**: The mapping from the tag value to the series id or 
timestamp. It also contains a location info to the stored tag value.
+- **Stored Tag Value**: The stored tag value. If you set tag spec 
`indexed_only=true`, the tag value will not be stored here.
+
+![inverted-index](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/inverted-index.png)
+
+If you want to search `Tag1=Value1`, the index will first search the `Tags` 
part to find the dictionary location of `Tag1`. Then, it will search the 
`Dictionary` part to find the posting list location of `Value1`. Finally, it 
will search the `Posting List` part to find the series id or timestamp. If you 
want to fetch the tag value, it will search the `Stored Tag Value` part to find 
the tag value.
+
 ## Part
 
 Within a part, data is split into multiple files in a columnar manner. The 
timestamps are stored in the `timestamps.bin` file, tags are organized in 
persistent tag families as various files with the `.tf` suffix, and fields are 
stored separately in the `fields.bin` file. 
diff --git a/docs/menu.yml b/docs/menu.yml
index bc31ebd2..19ec1733 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -126,7 +126,7 @@ catalog:
   - name: "File Format"
     catalog:
       - name: "v1.1.0"
-        path: ""
+        path: "/concept/tsdb"
   - name: "Concepts"
     catalog:
       - name: "Clustering"
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4987ddb5..5d35d6aa 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -380,7 +380,7 @@ func (bmi *blugeMatchIterator) Next() bool {
                bmi.err = io.EOF
                return false
        }
-       bmi.hit++
+       bmi.hit = match.HitNumber
        for i := range bmi.current.Values {
                bmi.current.Values[i] = nil
        }
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
index f9b700f4..7288dd31 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -113,12 +113,17 @@ func (s *store) Search(ctx context.Context,
 func parseResult(dmi search.DocumentMatchIterator, loadedFields 
[]index.FieldKey) ([]index.SeriesDocument, error) {
        result := make([]index.SeriesDocument, 0, 10)
        next, err := dmi.Next()
+       if err != nil {
+               return nil, errors.WithMessage(err, "iterate document match 
iterator")
+       }
        docIDMap := make(map[uint64]struct{})
        fields := make([]string, 0, len(loadedFields))
        for i := range loadedFields {
                fields = append(fields, loadedFields[i].Marshal())
        }
+       var hitNumber int
        for err == nil && next != nil {
+               hitNumber = next.HitNumber
                var doc index.SeriesDocument
                if len(loadedFields) > 0 {
                        doc.Fields = make(map[string][]byte)
@@ -144,7 +149,7 @@ func parseResult(dmi search.DocumentMatchIterator, 
loadedFields []index.FieldKey
                        return true
                })
                if err != nil {
-                       return nil, errors.WithMessagef(err, "visit stored 
fields, hit: %d", len(result))
+                       return nil, errors.WithMessagef(err, "visit stored 
fields, hit: %d", hitNumber)
                }
                if doc.Key.ID > 0 {
                        result = append(result, doc)
@@ -152,7 +157,7 @@ func parseResult(dmi search.DocumentMatchIterator, 
loadedFields []index.FieldKey
                next, err = dmi.Next()
        }
        if err != nil {
-               return nil, errors.WithMessagef(err, "iterate document match 
iterator, hit: %d", len(result))
+               return nil, errors.WithMessagef(err, "iterate document match 
iterator, hit: %d", hitNumber)
        }
        return result, nil
 }
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 49723340..f7db768a 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -73,6 +73,11 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
 }
 
 func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
+       select {
+       case <-ctx.Done():
+               return nil, ctx.Err()
+       default:
+       }
        if i.result != nil {
                return BuildElementsFromStreamResult(ctx, i.result), nil
        }
diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile
index a8b068c4..4ce236a5 100644
--- a/test/docker/Dockerfile
+++ b/test/docker/Dockerfile
@@ -26,6 +26,8 @@ COPY 
banyand/build/bin/linux/${TARGETARCH}/banyand-server-static /banyand
 COPY bydbctl/build/bin/linux/${TARGETARCH}/bydbctl-cli-static /bydbctl
 COPY --from=certs /etc/ssl/certs /etc/ssl/certs
 
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=INFO
+ENV GRPC_GO_LOG_FORMATTER=json
 
 EXPOSE 17912
 EXPOSE 17913

Reply via email to