This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c802d25d Add version to timestamp file (#468)
c802d25d is described below
commit c802d25d16b6a111655ea159c849b331a6778717
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 12 22:38:53 2024 +0800
Add version to timestamp file (#468)
---
CHANGES.md | 1 +
api/proto/banyandb/measure/v1/query.proto | 2 +
api/proto/banyandb/measure/v1/write.proto | 2 +
banyand/liaison/grpc/measure.go | 6 +
banyand/measure/block.go | 78 ++-
banyand/measure/block_metadata.go | 29 +-
banyand/measure/block_metadata_test.go | 92 ++-
banyand/measure/block_test.go | 88 +--
banyand/measure/block_writer.go | 4 +-
banyand/measure/datapoints.go | 2 +
banyand/measure/measure.go | 1 -
banyand/measure/merger.go | 2 +-
banyand/measure/merger_test.go | 11 +-
banyand/measure/part.go | 4 +-
banyand/measure/part_iter.go | 2 -
banyand/measure/part_test.go | 3 +
banyand/measure/query.go | 10 +-
banyand/measure/query_test.go | 744 ++++++++++++++++++++-
banyand/measure/tstable_test.go | 64 +-
banyand/measure/write.go | 1 +
banyand/stream/block.go | 2 +-
docs/api-reference.md | 2 +
pkg/encoding/encoding.go | 37 +-
pkg/encoding/int_list.go | 5 +-
pkg/pb/v1/metadata.go | 1 +
.../measure/measure_plan_indexscan_local.go | 1 +
test/cases/measure/data/data.go | 6 +
...service_instance_endpoint_cpm_minute_data1.json | 66 +-
28 files changed, 1098 insertions(+), 168 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 731c2316..1d0b47a2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
### Bugs
- Fix the filtering of stream in descending order by timestamp.
+- Fix querying old data points when the data is in a newer part. A version
column is introduced to each data point and stored in the timestamp file.
## 0.6.1
diff --git a/api/proto/banyandb/measure/v1/query.proto
b/api/proto/banyandb/measure/v1/query.proto
index 6cb04417..49ef6a62 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -40,6 +40,8 @@ message DataPoint {
}
// fields contains fields selected in the projection
repeated Field fields = 3;
+ // version is the version of the data point
+ int64 version = 4;
}
// QueryResponse is the response for a query to the Query module.
diff --git a/api/proto/banyandb/measure/v1/write.proto
b/api/proto/banyandb/measure/v1/write.proto
index 3def9493..ea43dc59 100644
--- a/api/proto/banyandb/measure/v1/write.proto
+++ b/api/proto/banyandb/measure/v1/write.proto
@@ -36,6 +36,8 @@ message DataPointValue {
repeated model.v1.TagFamilyForWrite tag_families = 2
[(validate.rules).repeated.min_items = 1];
// the order of fields match the measure schema
repeated model.v1.FieldValue fields = 3;
+ // the version of the data point
+ int64 version = 4;
}
// WriteRequest is the request contract for write
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 13a8399b..6b480738 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -108,6 +108,12 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure,
ms.sampled)
continue
}
+ if writeRequest.DataPoint.Version == 0 {
+ if writeRequest.MessageId == 0 {
+ writeRequest.MessageId =
uint64(time.Now().UnixNano())
+ }
+ writeRequest.DataPoint.Version =
int64(writeRequest.MessageId)
+ }
if ms.ingestionAccessLog != nil {
if errAccessLog :=
ms.ingestionAccessLog.Write(writeRequest); errAccessLog != nil {
ms.sampled.Error().Err(errAccessLog).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to write access log")
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index b4792d85..ec536df6 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -34,6 +34,8 @@ import (
type block struct {
timestamps []int64
+ versions []int64
+
tagFamilies []columnFamily
field columnFamily
@@ -41,6 +43,7 @@ type block struct {
func (b *block) reset() {
b.timestamps = b.timestamps[:0]
+ b.versions = b.versions[:0]
tff := b.tagFamilies
for i := range tff {
@@ -50,7 +53,7 @@ func (b *block) reset() {
b.field.reset()
}
-func (b *block) mustInitFromDataPoints(timestamps []int64, tagFamilies
[][]nameValues, fields []nameValues) {
+func (b *block) mustInitFromDataPoints(timestamps []int64, versions []int64,
tagFamilies [][]nameValues, fields []nameValues) {
b.reset()
size := len(timestamps)
if size == 0 {
@@ -65,13 +68,14 @@ func (b *block) mustInitFromDataPoints(timestamps []int64,
tagFamilies [][]nameV
assertTimestampsSorted(timestamps)
b.timestamps = append(b.timestamps, timestamps...)
+ b.versions = append(b.versions, versions...)
b.mustInitFromTagsAndFields(tagFamilies, fields)
}
func assertTimestampsSorted(timestamps []int64) {
for i := range timestamps {
if i > 0 && timestamps[i-1] > timestamps[i] {
- logger.Panicf("log entries must be sorted by timestamp;
got the previous entry with bigger timestamp %d than the current entry with
timestamp %d",
+ logger.Panicf("data points must be sorted by timestamp;
got the previous data point with bigger timestamp %d than the current data
point with timestamp %d",
timestamps[i-1], timestamps[i])
}
}
@@ -136,7 +140,7 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm
*blockMetadata, ww *writers)
bm.uncompressedSizeBytes = b.uncompressedSizeBytes()
bm.count = uint64(b.Len())
- mustWriteTimestampsTo(&bm.timestamps, b.timestamps,
&ww.timestampsWriter)
+ mustWriteTimestampsTo(&bm.timestamps, b.timestamps, b.versions,
&ww.timestampsWriter)
for ti := range b.tagFamilies {
b.marshalTagFamily(b.tagFamilies[ti], bm, ww)
@@ -284,7 +288,7 @@ func (b *block) uncompressedSizeBytes() uint64 {
func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm
blockMetadata) {
b.reset()
- b.timestamps = mustReadTimestampsFrom(b.timestamps, &bm.timestamps,
int(bm.count), p.timestamps)
+ b.timestamps, b.versions = mustReadTimestampsFrom(b.timestamps,
b.versions, &bm.timestamps, int(bm.count), p.timestamps)
cc := b.field.resizeColumns(len(bm.field.columnMetadata))
for i := range cc {
@@ -307,7 +311,7 @@ func (b *block) mustReadFrom(decoder
*encoding.BytesBlockDecoder, p *part, bm bl
func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder,
seqReaders *seqReaders, bm blockMetadata) {
b.reset()
- b.timestamps = mustSeqReadTimestampsFrom(b.timestamps, &bm.timestamps,
int(bm.count), &seqReaders.timestamps)
+ b.timestamps, b.versions = mustSeqReadTimestampsFrom(b.timestamps,
b.versions, &bm.timestamps, int(bm.count), &seqReaders.timestamps)
cc := b.field.resizeColumns(len(bm.field.columnMetadata))
for i := range cc {
@@ -333,35 +337,35 @@ func (b *block) sortTagFamilies() {
})
}
-func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64,
timestampsWriter *writer) {
+func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps, versions
[]int64, timestampsWriter *writer) {
tm.reset()
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf, tm.encodeType, tm.min = encoding.Int64ListToBytes(bb.Buf[:0],
timestamps)
- if len(bb.Buf) > maxTimestampsBlockSize {
- logger.Panicf("too big block with timestamps: %d bytes; the
maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize)
+ tm.encodeType = encoding.GetVersionType(tm.encodeType)
+ if tm.encodeType == encoding.EncodeTypeUnknown {
+ logger.Panicf("unexpected encodeType %d", tm.encodeType)
+ return
}
tm.max = timestamps[len(timestamps)-1]
tm.offset = timestampsWriter.bytesWritten
- tm.size = uint64(len(bb.Buf))
+ tm.versionOffset = uint64(len(bb.Buf))
+ timestampsWriter.MustWrite(bb.Buf)
+ bb.Buf, tm.versionEncodeType, tm.versionFirst =
encoding.Int64ListToBytes(bb.Buf[:0], versions)
+ tm.size = tm.versionOffset + uint64(len(bb.Buf))
timestampsWriter.MustWrite(bb.Buf)
}
-func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int,
reader fs.Reader) []int64 {
+func mustReadTimestampsFrom(timestamps, versions []int64, tm
*timestampsMetadata, count int, reader fs.Reader) ([]int64, []int64) {
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
fs.MustReadData(reader, int64(tm.offset), bb.Buf)
- var err error
- dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType,
tm.min, count)
- if err != nil {
- logger.Panicf("%s: cannot unmarshal timestamps: %v",
reader.Path(), err)
- }
- return dst
+ return mustDecodeTimestampsWithVersions(timestamps, versions, tm,
count, reader.Path(), bb.Buf)
}
-func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int,
reader *seqReader) []int64 {
+func mustSeqReadTimestampsFrom(timestamps, versions []int64, tm
*timestampsMetadata, count int, reader *seqReader) ([]int64, []int64) {
if tm.offset != reader.bytesRead {
logger.Panicf("offset %d must be equal to bytesRead %d",
tm.offset, reader.bytesRead)
}
@@ -369,12 +373,31 @@ func mustSeqReadTimestampsFrom(dst []int64, tm
*timestampsMetadata, count int, r
defer bigValuePool.Release(bb)
bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
reader.mustReadFull(bb.Buf)
+ return mustDecodeTimestampsWithVersions(timestamps, versions, tm,
count, reader.Path(), bb.Buf)
+}
+
+func mustDecodeTimestampsWithVersions(timestamps, versions []int64, tm
*timestampsMetadata, count int, path string, src []byte) ([]int64, []int64) {
var err error
- dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType,
tm.min, count)
+ if t := encoding.GetCommonType(tm.encodeType); t !=
encoding.EncodeTypeUnknown {
+ if tm.size < tm.versionOffset {
+ logger.Panicf("size %d must be greater than
versionOffset %d", tm.size, tm.versionOffset)
+ }
+ timestamps, err = encoding.BytesToInt64List(timestamps,
src[:tm.versionOffset], t, tm.min, count)
+ if err != nil {
+ logger.Panicf("%s: cannot unmarshal timestamps with
versions: %v", path, err)
+ }
+ versions, err = encoding.BytesToInt64List(versions,
src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count)
+ if err != nil {
+ logger.Panicf("%s: cannot unmarshal versions: %v",
path, err)
+ }
+ return timestamps, versions
+ }
+ timestamps, err = encoding.BytesToInt64List(timestamps, src,
tm.encodeType, tm.min, count)
if err != nil {
- logger.Panicf("%s: cannot unmarshal timestamps: %v",
reader.Path(), err)
+ logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err)
}
- return dst
+ versions = encoding.ExtendInt64ListCapacity(versions, count)
+ return timestamps, versions
}
func generateBlock() *block {
@@ -396,6 +419,7 @@ type blockCursor struct {
p *part
fields columnFamily
timestamps []int64
+ versions []int64
tagFamilies []columnFamily
columnValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
@@ -416,6 +440,7 @@ func (bc *blockCursor) reset() {
bc.fieldProjection = bc.fieldProjection[:0]
bc.timestamps = bc.timestamps[:0]
+ bc.versions = bc.versions[:0]
tff := bc.tagFamilies
for i := range tff {
@@ -452,6 +477,7 @@ func (bc *blockCursor) copyAllTo(r *pbv1.MeasureResult,
entityValuesAll map[comm
size := offset - idx
r.SID = bc.bm.seriesID
r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...)
+ r.Versions = append(r.Versions, bc.versions[idx:offset]...)
var entityValues map[string]*modelv1.TagValue
if entityValuesAll != nil {
entityValues = entityValuesAll[r.SID]
@@ -532,6 +558,7 @@ func (bc *blockCursor) copyTo(r *pbv1.MeasureResult,
entityValuesAll map[common.
) {
r.SID = bc.bm.seriesID
r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx])
+ r.Versions = append(r.Versions, bc.versions[bc.idx])
var entityValues map[string]*modelv1.TagValue
if entityValuesAll != nil {
entityValues = entityValuesAll[r.SID]
@@ -629,6 +656,7 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
return false
}
bc.timestamps = append(bc.timestamps,
tmpBlock.timestamps[start:end+1]...)
+ bc.versions = append(bc.versions, tmpBlock.versions[start:end+1]...)
for _, cf := range tmpBlock.tagFamilies {
tf := columnFamily{
@@ -687,9 +715,8 @@ func releaseBlockCursor(bc *blockCursor) {
type blockPointer struct {
block
- bm blockMetadata
- idx int
- lastPartID uint64
+ bm blockMetadata
+ idx int
}
func (bi *blockPointer) updateMetadata() {
@@ -770,8 +797,8 @@ func (bi *blockPointer) append(b *blockPointer, offset int)
{
assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
-
- bi.lastPartID = b.lastPartID
+ assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
+ bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
}
func assertIdxAndOffset(name string, length int, idx int, offset int) {
@@ -789,7 +816,6 @@ func (bi *blockPointer) isFull() bool {
func (bi *blockPointer) reset() {
bi.idx = 0
- bi.lastPartID = 0
bi.block.reset()
bi.bm = blockMetadata{}
}
diff --git a/banyand/measure/block_metadata.go
b/banyand/measure/block_metadata.go
index 6a066329..b58a0e4d 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -236,9 +236,12 @@ func releaseBlockMetadataArray(bma *blockMetadataArray) {
type timestampsMetadata struct {
dataBlock
- min int64
- max int64
- encodeType encoding.EncodeType
+ min int64
+ max int64
+ versionOffset uint64
+ versionFirst int64
+ encodeType encoding.EncodeType
+ versionEncodeType encoding.EncodeType
}
func (tm *timestampsMetadata) reset() {
@@ -246,6 +249,9 @@ func (tm *timestampsMetadata) reset() {
tm.min = 0
tm.max = 0
tm.encodeType = 0
+ tm.versionOffset = 0
+ tm.versionFirst = 0
+ tm.versionEncodeType = 0
}
func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) {
@@ -253,6 +259,9 @@ func (tm *timestampsMetadata) copyFrom(src
*timestampsMetadata) {
tm.min = src.min
tm.max = src.max
tm.encodeType = src.encodeType
+ tm.versionOffset = src.versionOffset
+ tm.versionFirst = src.versionFirst
+ tm.versionEncodeType = src.versionEncodeType
}
func (tm *timestampsMetadata) marshal(dst []byte) []byte {
@@ -260,19 +269,31 @@ func (tm *timestampsMetadata) marshal(dst []byte) []byte {
dst = encoding.Uint64ToBytes(dst, uint64(tm.min))
dst = encoding.Uint64ToBytes(dst, uint64(tm.max))
dst = append(dst, byte(tm.encodeType))
+ dst = encoding.VarUint64ToBytes(dst, tm.versionOffset)
+ dst = encoding.Uint64ToBytes(dst, uint64(tm.versionFirst))
+ dst = append(dst, byte(tm.versionEncodeType))
return dst
}
func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) {
src, err := tm.dataBlock.unmarshal(src)
if err != nil {
- return nil, fmt.Errorf("cannot unmarshal dataBlock: %w", err)
+ return nil, fmt.Errorf("cannot unmarshal ts dataBlock: %w", err)
}
tm.min = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.max = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.encodeType = encoding.EncodeType(src[0])
+ src = src[1:]
+ src, n, err := encoding.BytesToVarUint64(src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot unmarshal ts offset: %w", err)
+ }
+ tm.versionOffset = n
+ tm.versionFirst = int64(encoding.BytesToUint64(src))
+ src = src[8:]
+ tm.versionEncodeType = encoding.EncodeType(src[0])
return src[1:], nil
}
diff --git a/banyand/measure/block_metadata_test.go
b/banyand/measure/block_metadata_test.go
index 59709b73..6bac5d09 100644
--- a/banyand/measure/block_metadata_test.go
+++ b/banyand/measure/block_metadata_test.go
@@ -80,9 +80,12 @@ func Test_timestampsMetadata_reset(t *testing.T) {
offset: 1,
size: 1,
},
- min: 1,
- max: 1,
- encodeType: encoding.EncodeTypeConst,
+ min: 1,
+ max: 1,
+ encodeType: encoding.EncodeTypeConst,
+ versionOffset: 1,
+ versionFirst: 1,
+ versionEncodeType: encoding.EncodeTypeDelta,
}
tm.reset()
@@ -92,6 +95,9 @@ func Test_timestampsMetadata_reset(t *testing.T) {
assert.Equal(t, int64(0), tm.min)
assert.Equal(t, int64(0), tm.max)
assert.Equal(t, encoding.EncodeTypeUnknown, tm.encodeType)
+ assert.Equal(t, uint64(0), tm.versionOffset)
+ assert.Equal(t, int64(0), tm.versionFirst)
+ assert.Equal(t, encoding.EncodeTypeUnknown, tm.versionEncodeType)
}
func Test_timestampsMetadata_copyFrom(t *testing.T) {
@@ -100,9 +106,12 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
offset: 1,
size: 1,
},
- min: 1,
- max: 1,
- encodeType: encoding.EncodeTypeConst,
+ min: 1,
+ max: 1,
+ encodeType: encoding.EncodeTypeConst,
+ versionOffset: 1,
+ versionFirst: 1,
+ versionEncodeType: encoding.EncodeTypeDelta,
}
dest := ×tampsMetadata{
@@ -110,9 +119,12 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
offset: 2,
size: 2,
},
- min: 2,
- max: 2,
- encodeType: encoding.EncodeTypeDelta,
+ min: 2,
+ max: 2,
+ encodeType: encoding.EncodeTypeDelta,
+ versionOffset: 2,
+ versionFirst: 2,
+ versionEncodeType: encoding.EncodeTypeDeltaOfDelta,
}
dest.copyFrom(src)
@@ -122,6 +134,9 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
assert.Equal(t, src.min, dest.min)
assert.Equal(t, src.max, dest.max)
assert.Equal(t, src.encodeType, dest.encodeType)
+ assert.Equal(t, src.versionOffset, dest.versionOffset)
+ assert.Equal(t, src.versionFirst, dest.versionFirst)
+ assert.Equal(t, src.versionEncodeType, dest.versionEncodeType)
}
func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) {
@@ -130,9 +145,12 @@ func Test_timestampsMetadata_marshal_unmarshal(t
*testing.T) {
offset: 1,
size: 1,
},
- min: 1,
- max: 1,
- encodeType: encoding.EncodeTypeConst,
+ min: 1,
+ max: 1,
+ encodeType: encoding.EncodeTypeConst,
+ versionOffset: 1,
+ versionFirst: 1,
+ versionEncodeType: encoding.EncodeTypeDelta,
}
marshaled := original.marshal(nil)
@@ -147,6 +165,9 @@ func Test_timestampsMetadata_marshal_unmarshal(t
*testing.T) {
assert.Equal(t, original.min, unmarshaled.min)
assert.Equal(t, original.max, unmarshaled.max)
assert.Equal(t, original.encodeType, unmarshaled.encodeType)
+ assert.Equal(t, original.versionOffset, unmarshaled.versionOffset)
+ assert.Equal(t, original.versionFirst, unmarshaled.versionFirst)
+ assert.Equal(t, original.versionEncodeType,
unmarshaled.versionEncodeType)
}
func Test_blockMetadata_marshal_unmarshal(t *testing.T) {
@@ -200,6 +221,44 @@ func Test_blockMetadata_marshal_unmarshal(t *testing.T) {
},
},
},
+ {
+ name: "Non-zero values with versions",
+ original: &blockMetadata{
+ seriesID: common.SeriesID(1),
+ uncompressedSizeBytes: 1,
+ count: 1,
+ timestamps: timestampsMetadata{
+ dataBlock: dataBlock{
+ offset: 1,
+ size: 1,
+ },
+ min: 1,
+ max: 1,
+ encodeType:
encoding.EncodeTypeConst,
+ versionOffset: 1,
+ versionFirst: 1,
+ versionEncodeType:
encoding.EncodeTypeDelta,
+ },
+ tagFamilies: map[string]*dataBlock{
+ "tag1": {
+ offset: 1,
+ size: 1,
+ },
+ },
+ field: columnFamilyMetadata{
+ columnMetadata: []columnMetadata{
+ {
+ dataBlock: dataBlock{
+ offset: 1,
+ size: 1,
+ },
+ name: "field1",
+ valueType:
pbv1.ValueTypeInt64,
+ },
+ },
+ },
+ },
+ },
{
name: "Multiple tagFamilies and columnMetadata",
original: &blockMetadata{
@@ -211,9 +270,12 @@ func Test_blockMetadata_marshal_unmarshal(t *testing.T) {
offset: 2,
size: 2,
},
- min: 2,
- max: 2,
- encodeType: encoding.EncodeTypeConst,
+ min: 2,
+ max: 2,
+ encodeType:
encoding.EncodeTypeConst,
+ versionOffset: 2,
+ versionFirst: 2,
+ versionEncodeType:
encoding.EncodeTypeDelta,
},
tagFamilies: map[string]*dataBlock{
"tag1": {
diff --git a/banyand/measure/block_test.go b/banyand/measure/block_test.go
index 53b72f8b..fdd2e350 100644
--- a/banyand/measure/block_test.go
+++ b/banyand/measure/block_test.go
@@ -18,8 +18,6 @@
package measure
import (
- "crypto/rand"
- "encoding/binary"
"reflect"
"testing"
@@ -36,6 +34,7 @@ import (
func Test_block_reset(t *testing.T) {
type fields struct {
timestamps []int64
+ versions []int64
tagFamilies []columnFamily
field columnFamily
}
@@ -48,11 +47,13 @@ func Test_block_reset(t *testing.T) {
name: "Test reset",
fields: fields{
timestamps: []int64{1, 2, 3},
+ versions: []int64{1, 2, 3},
tagFamilies: []columnFamily{{}, {}, {}},
field: columnFamily{columns: []column{{},
{}}},
},
want: block{
timestamps: []int64{},
+ versions: []int64{},
tagFamilies: []columnFamily{},
field: columnFamily{columns: []column{}},
},
@@ -62,6 +63,7 @@ func Test_block_reset(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
b := &block{
timestamps: tt.fields.timestamps,
+ versions: tt.fields.versions,
tagFamilies: tt.fields.tagFamilies,
field: tt.fields.field,
}
@@ -87,6 +89,7 @@ func toTagProjection(b block) map[string][]string {
var conventionalBlock = block{
timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -131,6 +134,7 @@ var conventionalBlock = block{
func Test_block_mustInitFromDataPoints(t *testing.T) {
type args struct {
timestamps []int64
+ versions []int64
tagFamilies [][]nameValues
fields []nameValues
}
@@ -143,6 +147,7 @@ func Test_block_mustInitFromDataPoints(t *testing.T) {
name: "Test mustInitFromDataPoints",
args: args{
timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
tagFamilies: [][]nameValues{
{
{
@@ -208,7 +213,7 @@ func Test_block_mustInitFromDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &block{}
- b.mustInitFromDataPoints(tt.args.timestamps,
tt.args.tagFamilies, tt.args.fields)
+ b.mustInitFromDataPoints(tt.args.timestamps,
tt.args.versions, tt.args.tagFamilies, tt.args.fields)
if !reflect.DeepEqual(*b, tt.want) {
t.Errorf("block.mustInitFromDataPoints() = %+v,
want %+v", *b, tt.want)
}
@@ -237,21 +242,31 @@ func marshalIntArr(arr [][]byte) []byte {
}
func Test_mustWriteAndReadTimestamps(t *testing.T) {
+ type args struct {
+ timestamp []int64
+ versions []int64
+ }
tests := []struct {
name string
- args []int64
+ args args
wantPanic bool
wantTM timestampsMetadata
}{
{
- name: "Test mustWriteAndReadTimestamps",
- args: []int64{1, 2, 3, 4, 5},
+ name: "Test mustWriteAndReadTimestamps",
+ args: args{
+ timestamp: []int64{1, 2, 3, 4, 5},
+ versions: []int64{1, 2, 3, 4, 5},
+ },
wantPanic: false,
},
{
- name: "Test mustWriteAndReadTimestamps with panic",
- args: getBitInt64Arr(),
- wantPanic: true,
+ name: "Test mustWriteAndReadTimestamps with the same
version",
+ args: args{
+ timestamp: []int64{1, 2, 3, 4, 5},
+ versions: []int64{0, 0, 0, 0, 0},
+ },
+ wantPanic: false,
},
}
for _, tt := range tests {
@@ -266,26 +281,18 @@ func Test_mustWriteAndReadTimestamps(t *testing.T) {
b := &bytes.Buffer{}
w := new(writer)
w.init(b)
- mustWriteTimestampsTo(tm, tt.args, w)
- timestamps := mustReadTimestampsFrom(nil, tm,
len(tt.args), b)
- if !reflect.DeepEqual(timestamps, tt.args) {
+ mustWriteTimestampsTo(tm, tt.args.timestamp,
tt.args.versions, w)
+ timestamps, versions := mustReadTimestampsFrom(nil,
nil, tm, len(tt.args.timestamp), b)
+ if !reflect.DeepEqual(timestamps, tt.args.timestamp) {
t.Errorf("mustReadTimestampsFrom() = %v, want
%v", timestamps, tt.args)
}
+ if !reflect.DeepEqual(versions, tt.args.versions) {
+ t.Errorf("mustReadTimestampsFrom() = %v, want
%v", versions, tt.args)
+ }
})
}
}
-func getBitInt64Arr() []int64 {
- size := maxTimestampsBlockSize + 1
- randSlice := make([]int64, size)
- for i := range randSlice {
- b := make([]byte, 8)
- _, _ = rand.Read(b)
- randSlice[i] = int64(binary.BigEndian.Uint64(b))
- }
- return randSlice
-}
-
func Test_marshalAndUnmarshalTagFamily(t *testing.T) {
metaBuffer, dataBuffer := &bytes.Buffer{}, &bytes.Buffer{}
ww := &writers{
@@ -420,6 +427,7 @@ func Test_marshalAndUnmarshalBlock(t *testing.T) {
func Test_blockPointer_append(t *testing.T) {
type fields struct {
timestamps []int64
+ versions []int64
tagFamilies []columnFamily
field columnFamily
partID uint64
@@ -439,6 +447,7 @@ func Test_blockPointer_append(t *testing.T) {
name: "Test append with empty block",
fields: fields{
timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -460,6 +469,7 @@ func Test_blockPointer_append(t *testing.T) {
b: &blockPointer{
block: block{
timestamps: []int64{},
+ versions: []int64{},
tagFamilies: []columnFamily{},
field: columnFamily{},
},
@@ -470,6 +480,7 @@ func Test_blockPointer_append(t *testing.T) {
want: &blockPointer{
block: block{
timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -494,6 +505,7 @@ func Test_blockPointer_append(t *testing.T) {
name: "Test append to a empty block",
fields: fields{
timestamps: nil,
+ versions: nil,
tagFamilies: nil,
field: columnFamily{
columns: nil,
@@ -502,9 +514,9 @@ func Test_blockPointer_append(t *testing.T) {
},
args: args{
b: &blockPointer{
- lastPartID: 2,
block: block{
timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -527,9 +539,9 @@ func Test_blockPointer_append(t *testing.T) {
offset: 2,
},
want: &blockPointer{
- lastPartID: 2,
block: block{
timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -554,6 +566,7 @@ func Test_blockPointer_append(t *testing.T) {
name: "Test append with offset equals to the data size.
All data",
fields: fields{
timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -574,9 +587,9 @@ func Test_blockPointer_append(t *testing.T) {
},
args: args{
b: &blockPointer{
- lastPartID: 2,
block: block{
timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -599,9 +612,9 @@ func Test_blockPointer_append(t *testing.T) {
offset: 2,
},
want: &blockPointer{
- lastPartID: 2,
block: block{
timestamps: []int64{1, 2, 4, 5},
+ versions: []int64{1, 1, 1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -630,6 +643,7 @@ func Test_blockPointer_append(t *testing.T) {
name: "Test append with non-empty block and offset less
than timestamps",
fields: fields{
timestamps: []int64{1, 2},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -650,9 +664,9 @@ func Test_blockPointer_append(t *testing.T) {
},
args: args{
b: &blockPointer{
- lastPartID: 2,
block: block{
timestamps: []int64{4, 5},
+ versions: []int64{1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -675,9 +689,9 @@ func Test_blockPointer_append(t *testing.T) {
offset: 1,
},
want: &blockPointer{
- lastPartID: 2,
block: block{
timestamps: []int64{1, 2, 4},
+ versions: []int64{1, 1, 1},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -713,10 +727,10 @@ func Test_blockPointer_append(t *testing.T) {
bi := &blockPointer{
block: block{
timestamps: tt.fields.timestamps,
+ versions: tt.fields.versions,
tagFamilies: tt.fields.tagFamilies,
field: tt.fields.field,
},
- lastPartID: tt.fields.partID,
}
bi.append(tt.args.b, tt.args.offset)
if !reflect.DeepEqual(bi, tt.want) {
@@ -748,17 +762,15 @@ func Test_blockPointer_copyFrom(t *testing.T) {
},
args: args{
src: &blockPointer{
- bm: blockMetadata{count: 1},
- idx: 0,
- block: conventionalBlock,
- lastPartID: 2,
+ bm: blockMetadata{count: 1},
+ idx: 0,
+ block: conventionalBlock,
},
},
want: &blockPointer{
- bm: blockMetadata{count: 1},
- idx: 0,
- block: conventionalBlock,
- lastPartID: 2,
+ bm: blockMetadata{count: 1},
+ idx: 0,
+ block: conventionalBlock,
},
},
}
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index 760a63ad..fd065f1a 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -186,14 +186,14 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem
fs.FileSystem, path string
bw.writers.fieldValuesWriter.init(fs.MustCreateFile(fileSystem,
filepath.Join(path, fieldValuesFilename), filePermission))
}
-func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps
[]int64, tagFamilies [][]nameValues, fields []nameValues) {
+func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps,
versions []int64, tagFamilies [][]nameValues, fields []nameValues) {
if len(timestamps) == 0 {
return
}
b := generateBlock()
defer releaseBlock(b)
- b.mustInitFromDataPoints(timestamps, tagFamilies, fields)
+ b.mustInitFromDataPoints(timestamps, versions, tagFamilies, fields)
bw.mustWriteBlock(sid, b)
}
diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go
index 5cf2d9a9..3996e3a0 100644
--- a/banyand/measure/datapoints.go
+++ b/banyand/measure/datapoints.go
@@ -118,6 +118,7 @@ type nameValues struct {
type dataPoints struct {
seriesIDs []common.SeriesID
timestamps []int64
+ versions []int64
tagFamilies [][]nameValues
fields []nameValues
}
@@ -136,6 +137,7 @@ func (d *dataPoints) Less(i, j int) bool {
func (d *dataPoints) Swap(i, j int) {
d.seriesIDs[i], d.seriesIDs[j] = d.seriesIDs[j], d.seriesIDs[i]
d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i]
+ d.versions[i], d.versions[j] = d.versions[j], d.versions[i]
d.tagFamilies[i], d.tagFamilies[j] = d.tagFamilies[j], d.tagFamilies[i]
d.fields[i], d.fields[j] = d.fields[j], d.fields[i]
}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 43515964..c05c4c99 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -35,7 +35,6 @@ import (
const (
maxValuesBlockSize = 8 * 1024 * 1024
- maxTimestampsBlockSize = 8 * 1024 * 1024
maxTagFamiliesMetadataSize = 8 * 1024 * 1024
maxUncompressedBlockSize = 2 * 1024 * 1024
maxUncompressedPrimaryBlockSize = 128 * 1024
diff --git a/banyand/measure/merger.go b/banyand/measure/merger.go
index e957142c..85869691 100644
--- a/banyand/measure/merger.go
+++ b/banyand/measure/merger.go
@@ -370,7 +370,7 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
i++
}
if left.timestamps[i-1] == ts2 {
- if left.lastPartID >= right.lastPartID {
+ if left.versions[i-1] >= right.versions[right.idx] {
target.append(left, i)
} else {
target.append(left, i-1) // skip left
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index 0ad12e27..c8de11bd 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -61,6 +61,7 @@ func Test_mergeTwoBlocks(t *testing.T) {
left: &blockPointer{
block: block{
timestamps: []int64{1, 2},
+ versions: []int64{1, 4},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -82,6 +83,7 @@ func Test_mergeTwoBlocks(t *testing.T) {
right: &blockPointer{
block: block{
timestamps: []int64{3, 4},
+ versions: []int64{5, 6},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -107,6 +109,7 @@ func Test_mergeTwoBlocks(t *testing.T) {
left: &blockPointer{
block: block{
timestamps: []int64{1, 3},
+ versions: []int64{1, 5},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -128,6 +131,7 @@ func Test_mergeTwoBlocks(t *testing.T) {
right: &blockPointer{
block: block{
timestamps: []int64{2, 4},
+ versions: []int64{4, 6},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -151,9 +155,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
{
name: "Merge two non-empty blocks with duplicated
timestamps",
left: &blockPointer{
- lastPartID: 1, // the less partID will be
skipped
block: block{
timestamps: []int64{1, 2, 3},
+ versions: []int64{1, 2, 3},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -177,9 +181,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
},
},
right: &blockPointer{
- lastPartID: 2, // the greater partID will be
appended
block: block{
timestamps: []int64{2, 3, 4},
+ versions: []int64{4, 5, 6},
tagFamilies: []columnFamily{
{
name: "arrTag",
@@ -201,7 +205,7 @@ func Test_mergeTwoBlocks(t *testing.T) {
},
},
},
- want: &blockPointer{block: mergedBlock, lastPartID: 2,
bm: blockMetadata{timestamps: timestampsMetadata{min: 1, max: 4}}},
+ want: &blockPointer{block: mergedBlock, bm:
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 4}}},
},
}
@@ -218,6 +222,7 @@ func Test_mergeTwoBlocks(t *testing.T) {
var mergedBlock = block{
timestamps: []int64{1, 2, 3, 4},
+ versions: []int64{1, 4, 5, 6},
tagFamilies: []columnFamily{
{
name: "arrTag",
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 20b6a5a7..c6383814 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -158,14 +158,14 @@ func (mp *memPart) mustInitFromDataPoints(dps
*dataPoints) {
if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
(i-indexPrev) > maxBlockLength || sid != sidPrev {
- bsw.MustWriteDataPoints(sidPrev,
dps.timestamps[indexPrev:i], dps.tagFamilies[indexPrev:i],
dps.fields[indexPrev:i])
+ bsw.MustWriteDataPoints(sidPrev,
dps.timestamps[indexPrev:i], dps.versions[indexPrev:i],
dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
sidPrev = sid
indexPrev = i
uncompressedBlockSizeBytes = 0
}
uncompressedBlockSizeBytes += uncompressedDataPointSizeBytes(i,
dps)
}
- bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:],
dps.tagFamilies[indexPrev:], dps.fields[indexPrev:])
+ bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:],
dps.versions[indexPrev:], dps.tagFamilies[indexPrev:], dps.fields[indexPrev:])
bsw.Flush(&mp.partMetadata)
releaseBlockWriter(bsw)
}
diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go
index aef39977..0c916334 100644
--- a/banyand/measure/part_iter.go
+++ b/banyand/measure/part_iter.go
@@ -315,8 +315,6 @@ func (pmi *partMergeIter) loadBlockMetadata() error {
pm := pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1]
return fmt.Errorf("can't read block metadata from primary at
%d: %w", pm.offset, err)
}
-
- pmi.block.lastPartID = pmi.partID
return nil
}
diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go
index 1044a46c..e06b3e04 100644
--- a/banyand/measure/part_test.go
+++ b/banyand/measure/part_test.go
@@ -40,6 +40,7 @@ func TestMustInitFromDataPoints(t *testing.T) {
name: "Test with empty dataPoints",
dps: &dataPoints{
timestamps: []int64{},
+ versions: []int64{},
seriesIDs: []common.SeriesID{},
tagFamilies: make([][]nameValues, 0),
fields: make([]nameValues, 0),
@@ -50,6 +51,7 @@ func TestMustInitFromDataPoints(t *testing.T) {
name: "Test with one item in dataPoints",
dps: &dataPoints{
timestamps: []int64{1},
+ versions: []int64{1},
seriesIDs: []common.SeriesID{1},
tagFamilies: [][]nameValues{
{
@@ -126,6 +128,7 @@ func TestMustInitFromDataPoints(t *testing.T) {
var dps = &dataPoints{
seriesIDs: []common.SeriesID{1, 1, 2, 2, 3, 3},
timestamps: []int64{1, 2, 8, 10, 100, 220},
+ versions: []int64{1, 2, 3, 4, 5, 6},
tagFamilies: [][]nameValues{
{
{
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 6c9ae474..168a5663 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -438,8 +438,8 @@ func (qr queryResult) Len() int {
func (qr queryResult) Less(i, j int) bool {
leftTS := qr.data[i].timestamps[qr.data[i].idx]
rightTS := qr.data[j].timestamps[qr.data[j].idx]
- leftVersion := qr.data[i].p.partMetadata.ID
- rightVersion := qr.data[j].p.partMetadata.ID
+ leftVersion := qr.data[i].versions[qr.data[i].idx]
+ rightVersion := qr.data[j].versions[qr.data[j].idx]
if qr.orderByTS {
if leftTS == rightTS {
if qr.data[i].bm.seriesID == qr.data[j].bm.seriesID {
@@ -495,7 +495,7 @@ func (qr *queryResult) merge(entityValuesAll
map[common.SeriesID]map[string]*mod
step = -1
}
result := &pbv1.MeasureResult{}
- var lastPartVersion uint64
+ var lastVersion int64
var lastSid common.SeriesID
for qr.Len() > 0 {
@@ -507,12 +507,12 @@ func (qr *queryResult) merge(entityValuesAll
map[common.SeriesID]map[string]*mod
if len(result.Timestamps) > 0 &&
topBC.timestamps[topBC.idx] ==
result.Timestamps[len(result.Timestamps)-1] {
- if topBC.p.partMetadata.ID > lastPartVersion {
+ if topBC.versions[topBC.idx] > lastVersion {
logger.Panicf("following parts version should
be less or equal to the previous one")
}
} else {
topBC.copyTo(result, entityValuesAll, tagProjection)
- lastPartVersion = topBC.p.partMetadata.ID
+ lastVersion = topBC.versions[topBC.idx]
}
topBC.idx += step
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 56111323..e43dd5fc 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -59,6 +59,7 @@ func TestQueryResult(t *testing.T) {
want: []pbv1.MeasureResult{{
SID: 1,
Timestamps: []int64{1},
+ Versions: []int64{1},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
@@ -83,6 +84,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 2,
Timestamps: []int64{1},
+ Versions: []int64{2},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -102,6 +104,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 3,
Timestamps: []int64{1},
+ Versions: []int64{3},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -123,38 +126,40 @@ func TestQueryResult(t *testing.T) {
}},
},
{
- name: "Test with multiple parts with multiple
data orderBy TS desc",
- dpsList: []*dataPoints{dpsTS1, dpsTS2},
+ name: "Test with multiple parts with duplicated
data with different version order by TS 1",
+ dpsList: []*dataPoints{dpsTS1, dpsTS11},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
- maxTimestamp: 2,
+ maxTimestamp: 1,
want: []pbv1.MeasureResult{{
SID: 1,
- Timestamps: []int64{2},
+ Timestamps: []int64{1},
+ Versions: []int64{1},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
- {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}},
- {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}},
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
}},
{Name: "binaryTag", Tags: []pbv1.Tag{
{Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
}},
{Name: "singleTag", Tags: []pbv1.Tag{
- {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value3")}},
- {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(30)}},
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
{Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
{Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
}},
},
Fields: []pbv1.Field{
- {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field3")}},
- {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(3330)}},
- {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(3663699.029)}},
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}},
{Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
},
}, {
SID: 2,
- Timestamps: []int64{2},
+ Timestamps: []int64{1},
+ Versions: []int64{2},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -166,14 +171,15 @@ func TestQueryResult(t *testing.T) {
{Name: "singleTag", Tags: []pbv1.Tag{
{Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
{Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
- {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag3")}},
- {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag4")}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag1")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag2")}},
}},
},
Fields: nil,
}, {
SID: 3,
- Timestamps: []int64{2},
+ Timestamps: []int64{1},
+ Versions: []int64{3},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -190,11 +196,20 @@ func TestQueryResult(t *testing.T) {
}},
},
Fields: []pbv1.Field{
- {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(4440)}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
},
- }, {
+ }},
+ },
+ {
+ name: "Test with multiple parts with duplicated
data with different version order by TS 2",
+ dpsList: []*dataPoints{dpsTS11, dpsTS1},
+ sids: []common.SeriesID{1, 2, 3},
+ minTimestamp: 1,
+ maxTimestamp: 1,
+ want: []pbv1.MeasureResult{{
SID: 1,
Timestamps: []int64{1},
+ Versions: []int64{1},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
@@ -213,12 +228,13 @@ func TestQueryResult(t *testing.T) {
Fields: []pbv1.Field{
{Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
{Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
- {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1221233.343)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}},
{Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
},
}, {
SID: 2,
Timestamps: []int64{1},
+ Versions: []int64{2},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -238,6 +254,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 3,
Timestamps: []int64{1},
+ Versions: []int64{3},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -259,15 +276,82 @@ func TestQueryResult(t *testing.T) {
}},
},
{
- name: "Test with multiple parts with multiple
data orderBy TS asc",
+ name: "Test with multiple parts with multiple
data orderBy TS desc 1",
dpsList: []*dataPoints{dpsTS1, dpsTS2},
sids: []common.SeriesID{1, 2, 3},
- ascTS: true,
minTimestamp: 1,
maxTimestamp: 2,
want: []pbv1.MeasureResult{{
+ SID: 1,
+ Timestamps: []int64{2},
+ Versions: []int64{4},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value3")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(30)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field3")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(3330)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(3663699.029)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{2},
+ Versions: []int64{5},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag3")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag4")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{2},
+ Versions: []int64{6},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(4440)}},
+ },
+ }, {
SID: 1,
Timestamps: []int64{1},
+ Versions: []int64{1},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
@@ -292,6 +376,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 2,
Timestamps: []int64{1},
+ Versions: []int64{2},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -311,6 +396,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 3,
Timestamps: []int64{1},
+ Versions: []int64{3},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -329,9 +415,18 @@ func TestQueryResult(t *testing.T) {
Fields: []pbv1.Field{
{Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
},
- }, {
+ }},
+ },
+ {
+ name: "Test with multiple parts with multiple
data orderBy TS desc 2",
+ dpsList: []*dataPoints{dpsTS2, dpsTS1},
+ sids: []common.SeriesID{1, 2, 3},
+ minTimestamp: 1,
+ maxTimestamp: 2,
+ want: []pbv1.MeasureResult{{
SID: 1,
Timestamps: []int64{2},
+ Versions: []int64{4},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}},
@@ -356,6 +451,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 2,
Timestamps: []int64{2},
+ Versions: []int64{5},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -375,6 +471,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 3,
Timestamps: []int64{2},
+ Versions: []int64{6},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -393,18 +490,86 @@ func TestQueryResult(t *testing.T) {
Fields: []pbv1.Field{
{Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(4440)}},
},
+ }, {
+ SID: 1,
+ Timestamps: []int64{1},
+ Versions: []int64{1},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1221233.343)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{1},
+ Versions: []int64{2},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag1")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag2")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{1},
+ Versions: []int64{3},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ },
}},
},
{
- name: "Test with multiple parts with
duplicated data order by Series",
- dpsList: []*dataPoints{dpsTS1, dpsTS1},
- sids: []common.SeriesID{1, 2, 3},
- orderBySeries: true,
- minTimestamp: 1,
- maxTimestamp: 1,
+ name: "Test with multiple parts with multiple
data orderBy TS asc 1",
+ dpsList: []*dataPoints{dpsTS1, dpsTS2},
+ sids: []common.SeriesID{1, 2, 3},
+ ascTS: true,
+ minTimestamp: 1,
+ maxTimestamp: 2,
want: []pbv1.MeasureResult{{
SID: 1,
Timestamps: []int64{1},
+ Versions: []int64{1},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
@@ -423,12 +588,13 @@ func TestQueryResult(t *testing.T) {
Fields: []pbv1.Field{
{Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
{Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
- {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1221233.343)}},
{Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
},
}, {
SID: 2,
Timestamps: []int64{1},
+ Versions: []int64{2},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -448,6 +614,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 3,
Timestamps: []int64{1},
+ Versions: []int64{3},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
@@ -466,11 +633,525 @@ func TestQueryResult(t *testing.T) {
Fields: []pbv1.Field{
{Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
},
+ }, {
+ SID: 1,
+ Timestamps: []int64{2},
+ Versions: []int64{4},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value3")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(30)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field3")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(3330)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(3663699.029)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{2},
+ Versions: []int64{5},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag3")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag4")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{2},
+ Versions: []int64{6},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(4440)}},
+ },
}},
},
{
- name: "Test with multiple parts with multiple
data order by Series",
- dpsList: []*dataPoints{dpsTS1, dpsTS2},
+ name: "Test with multiple parts with multiple
data orderBy TS asc 2",
+ dpsList: []*dataPoints{dpsTS2, dpsTS1},
+ sids: []common.SeriesID{1, 2, 3},
+ ascTS: true,
+ minTimestamp: 1,
+ maxTimestamp: 2,
+ want: []pbv1.MeasureResult{{
+ SID: 1,
+ Timestamps: []int64{1},
+ Versions: []int64{1},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1221233.343)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{1},
+ Versions: []int64{2},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag1")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag2")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{1},
+ Versions: []int64{3},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ },
+ }, {
+ SID: 1,
+ Timestamps: []int64{2},
+ Versions: []int64{4},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value3")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(30)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field3")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(3330)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(3663699.029)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{2},
+ Versions: []int64{5},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag3")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag4")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{2},
+ Versions: []int64{6},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(4440)}},
+ },
+ }},
+ },
+ {
+ name: "Test with multiple parts with
duplicated data order by Series",
+ dpsList: []*dataPoints{dpsTS1, dpsTS1},
+ sids: []common.SeriesID{1, 2, 3},
+ orderBySeries: true,
+ minTimestamp: 1,
+ maxTimestamp: 1,
+ want: []pbv1.MeasureResult{{
+ SID: 1,
+ Timestamps: []int64{1},
+ Versions: []int64{1},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{1},
+ Versions: []int64{2},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag1")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag2")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{1},
+ Versions: []int64{3},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ },
+ }},
+ },
+ {
+ name: "Test with multiple parts with
duplicated data with different versions order by Series 1",
+ dpsList: []*dataPoints{dpsTS11, dpsTS1},
+ sids: []common.SeriesID{1, 2, 3},
+ orderBySeries: true,
+ minTimestamp: 1,
+ maxTimestamp: 1,
+ want: []pbv1.MeasureResult{{
+ SID: 1,
+ Timestamps: []int64{1},
+ Versions: []int64{1},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{1},
+ Versions: []int64{2},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag1")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag2")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{1},
+ Versions: []int64{3},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ },
+ }},
+ },
+ {
+ name: "Test with multiple parts with
duplicated data with different versions order by Series 2",
+ dpsList: []*dataPoints{dpsTS1, dpsTS11},
+ sids: []common.SeriesID{1, 2, 3},
+ orderBySeries: true,
+ minTimestamp: 1,
+ maxTimestamp: 1,
+ want: []pbv1.MeasureResult{{
+ SID: 1,
+ Timestamps: []int64{1},
+ Versions: []int64{1},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 2,
+ Timestamps: []int64{1},
+ Versions: []int64{2},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag1")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag2")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 3,
+ Timestamps: []int64{1},
+ Versions: []int64{3},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110)}},
+ },
+ }},
+ },
+ {
+ name: "Test with multiple parts with multiple
data order by Series 1",
+ dpsList: []*dataPoints{dpsTS1, dpsTS2},
+ sids: []common.SeriesID{2, 1, 3},
+ orderBySeries: true,
+ minTimestamp: 1,
+ maxTimestamp: 2,
+ want: []pbv1.MeasureResult{{
+ SID: 2,
+ Timestamps: []int64{1, 2},
+ Versions: []int64{2, 5},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag3")}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag4")}},
+ }},
+ },
+ Fields: nil,
+ }, {
+ SID: 1,
+ Timestamps: []int64{1, 2},
+ Versions: []int64{1, 4},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}),
strArrTagValue([]string{"value5", "value6"})}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30}),
int64ArrTagValue([]int64{35, 40})}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{binaryDataTagValue(longText),
binaryDataTagValue(longText)}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1"), strTagValue("value3")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10), int64TagValue(30)}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "strField", Values:
[]*modelv1.FieldValue{strFieldValue("field1"), strFieldValue("field3")}},
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110), int64FieldValue(3330)}},
+ {Name: "floatField", Values:
[]*modelv1.FieldValue{float64FieldValue(1.221233343e+06),
float64FieldValue(3663699.029)}},
+ {Name: "binaryField", Values:
[]*modelv1.FieldValue{binaryDataFieldValue(longText),
binaryDataFieldValue(longText)}},
+ },
+ }, {
+ SID: 3,
+ Timestamps: []int64{1, 2},
+ Versions: []int64{3, 6},
+ TagFamilies: []pbv1.TagFamily{
+ {Name: "arrTag", Tags: []pbv1.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "intArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ }},
+ {Name: "binaryTag", Tags: []pbv1.Tag{
+ {Name: "binaryTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ }},
+ {Name: "singleTag", Tags: []pbv1.Tag{
+ {Name: "strTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "strTag1", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ {Name: "strTag2", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []pbv1.Field{
+ {Name: "intField", Values:
[]*modelv1.FieldValue{int64FieldValue(1110), int64FieldValue(4440)}},
+ },
+ }},
+ },
+ {
+ name: "Test with multiple parts with multiple
data order by Series 2",
+ dpsList: []*dataPoints{dpsTS2, dpsTS1},
sids: []common.SeriesID{2, 1, 3},
orderBySeries: true,
minTimestamp: 1,
@@ -478,6 +1159,7 @@ func TestQueryResult(t *testing.T) {
want: []pbv1.MeasureResult{{
SID: 2,
Timestamps: []int64{1, 2},
+ Versions: []int64{2, 5},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
@@ -497,6 +1179,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 1,
Timestamps: []int64{1, 2},
+ Versions: []int64{1, 4},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}),
strArrTagValue([]string{"value5", "value6"})}},
@@ -521,6 +1204,7 @@ func TestQueryResult(t *testing.T) {
}, {
SID: 3,
Timestamps: []int64{1, 2},
+ Versions: []int64{3, 6},
TagFamilies: []pbv1.TagFamily{
{Name: "arrTag", Tags: []pbv1.Tag{
{Name: "strArrTag", Values:
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index 4ea3b81d..e82acff0 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -49,6 +49,7 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
dpsList: []*dataPoints{
{
timestamps: []int64{},
+ versions: []int64{},
seriesIDs: []common.SeriesID{},
tagFamilies: make([][]nameValues, 0),
fields: make([]nameValues, 0),
@@ -61,6 +62,7 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
dpsList: []*dataPoints{
{
timestamps: []int64{1},
+ versions: []int64{1},
seriesIDs: []common.SeriesID{1},
tagFamilies: [][]nameValues{
{
@@ -274,7 +276,7 @@ func Test_tstIter(t *testing.T) {
},
{
name: "Test with multiple parts with
different ts, the block will be merged",
- dpsList: []*dataPoints{dpsTS1, dpsTS2,
dpsTS2},
+ dpsList: []*dataPoints{dpsTS1, dpsTS2},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 2,
@@ -334,7 +336,7 @@ func Test_tstIter(t *testing.T) {
time.Sleep(100
* time.Millisecond)
continue
}
- if len(snp.parts) == 1
|| len(snp.parts) < len(tt.dpsList) {
+ if len(snp.parts) == 1 {
snp.decRef()
break
}
@@ -412,6 +414,7 @@ var fieldProjections = map[int][]string{
var dpsTS1 = &dataPoints{
seriesIDs: []common.SeriesID{1, 2, 3},
timestamps: []int64{1, 1, 1},
+ versions: []int64{1, 2, 3},
tagFamilies: [][]nameValues{
{
{
@@ -460,9 +463,62 @@ var dpsTS1 = &dataPoints{
},
}
+var dpsTS11 = &dataPoints{
+ seriesIDs: []common.SeriesID{1, 2, 3},
+ timestamps: []int64{1, 1, 1},
+ versions: []int64{0, 1, 2},
+ tagFamilies: [][]nameValues{
+ {
+ {
+ name: "arrTag", values: []*nameValue{
+ {name: "strArrTag", valueType:
pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value5"),
[]byte("value6")}},
+ {name: "intArrTag", valueType:
pbv1.ValueTypeInt64Arr, value: nil, valueArr:
[][]byte{convert.Int64ToBytes(35), convert.Int64ToBytes(40)}},
+ },
+ },
+ {
+ name: "binaryTag", values: []*nameValue{
+ {name: "binaryTag", valueType:
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+ },
+ },
+ {
+ name: "singleTag", values: []*nameValue{
+ {name: "strTag", valueType:
pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil},
+ {name: "intTag", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(30), valueArr: nil},
+ },
+ },
+ },
+ {
+ {
+ name: "singleTag", values: []*nameValue{
+ {name: "strTag1", valueType:
pbv1.ValueTypeStr, value: []byte("tag3"), valueArr: nil},
+ {name: "strTag2", valueType:
pbv1.ValueTypeStr, value: []byte("tag4"), valueArr: nil},
+ },
+ },
+ },
+ {}, // empty tagFamilies for seriesID 6
+ },
+ fields: []nameValues{
+ {
+ name: "skipped", values: []*nameValue{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, value: []byte("field3"), valueArr: nil},
+ {name: "intField", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(3330), valueArr: nil},
+ {name: "floatField", valueType:
pbv1.ValueTypeFloat64, value: convert.Float64ToBytes(3663699.029), valueArr:
nil},
+ {name: "binaryField", valueType:
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+ },
+ },
+ {}, // empty fields for seriesID 5
+ {
+ name: "onlyFields", values: []*nameValue{
+ {name: "intField", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(4440), valueArr: nil},
+ },
+ },
+ },
+}
+
var dpsTS2 = &dataPoints{
seriesIDs: []common.SeriesID{1, 2, 3},
timestamps: []int64{2, 2, 2},
+ versions: []int64{4, 5, 6},
tagFamilies: [][]nameValues{
{
{
@@ -515,12 +571,15 @@ func generateHugeDps(startTimestamp, endTimestamp,
timestamp int64) *dataPoints
hugeDps := &dataPoints{
seriesIDs: []common.SeriesID{},
timestamps: []int64{},
+ versions: []int64{},
tagFamilies: [][]nameValues{},
fields: []nameValues{},
}
+ now := time.Now().UnixNano()
for i := startTimestamp; i <= endTimestamp; i++ {
hugeDps.seriesIDs = append(hugeDps.seriesIDs, 1)
hugeDps.timestamps = append(hugeDps.timestamps, i)
+ hugeDps.versions = append(hugeDps.versions, now+i)
hugeDps.tagFamilies = append(hugeDps.tagFamilies, []nameValues{
{
name: "arrTag", values: []*nameValue{
@@ -551,6 +610,7 @@ func generateHugeDps(startTimestamp, endTimestamp,
timestamp int64) *dataPoints
}
hugeDps.seriesIDs = append(hugeDps.seriesIDs, []common.SeriesID{2,
3}...)
hugeDps.timestamps = append(hugeDps.timestamps, []int64{timestamp,
timestamp}...)
+ hugeDps.versions = append(hugeDps.versions, []int64{now + timestamp,
now + timestamp}...)
hugeDps.tagFamilies = append(hugeDps.tagFamilies, [][]nameValues{{
{
name: "singleTag", values: []*nameValue{
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index 32dd4a71..4c50c47f 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -93,6 +93,7 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
dpg.tables = append(dpg.tables, dpt)
}
dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts)
+ dpt.dataPoints.versions = append(dpt.dataPoints.versions,
req.DataPoint.Version)
stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find measure definition: %s",
req.GetMetadata())
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 84b73b01..252b3947 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -66,7 +66,7 @@ func (b *block) mustInitFromElements(timestamps []int64,
elementIDs []string, ta
func assertTimestampsSorted(timestamps []int64) {
for i := range timestamps {
if i > 0 && timestamps[i-1] > timestamps[i] {
- logger.Panicf("log entries must be sorted by timestamp;
got the previous entry with bigger timestamp %d than the current entry with
timestamp %d",
+ logger.Panicf("elements must be sorted by timestamp;
got the previous element with bigger timestamp %d than the current element with
timestamp %d",
timestamps[i-1], timestamps[i])
}
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index cae326b6..5ac35290 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2363,6 +2363,7 @@ DataPoint is stored in Measures
| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | |
timestamp is in the timeunit of milliseconds. |
| tag_families | [banyandb.model.v1.TagFamily](#banyandb-model-v1-TagFamily) |
repeated | tag_families contains tags selected in the projection |
| fields | [DataPoint.Field](#banyandb-measure-v1-DataPoint-Field) | repeated
| fields contains fields selected in the projection |
+| version | [int64](#int64) | | version is the version of the data point |
@@ -2635,6 +2636,7 @@ DataPointValue is the data point for writing. It only
contains values.
| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | |
timestamp is in the timeunit of milliseconds. |
| tag_families |
[banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) |
repeated | the order of tag_families' items match the measure schema |
| fields | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) |
repeated | the order of fields match the measure schema |
+| version | [int64](#int64) | | the version of the data point |
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 0646cdbb..d5cc0304 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -89,5 +89,40 @@ const (
EncodeTypeDeltaConst
EncodeTypeDelta
EncodeTypeDeltaOfDelta
- EncodeTypeXOR
+ EncodeTypeConstWithVersion
+ EncodeTypeDeltaConstWithVersion
+ EncodeTypeDeltaWithVersion
+ EncodeTypeDeltaOfDeltaWithVersion
)
+
+// GetVersionType returns the version type of the given encoding type.
+func GetVersionType(et EncodeType) EncodeType {
+ switch et {
+ case EncodeTypeConst:
+ return EncodeTypeConstWithVersion
+ case EncodeTypeDeltaConst:
+ return EncodeTypeDeltaConstWithVersion
+ case EncodeTypeDelta:
+ return EncodeTypeDeltaWithVersion
+ case EncodeTypeDeltaOfDelta:
+ return EncodeTypeDeltaOfDeltaWithVersion
+ default:
+ return EncodeTypeUnknown
+ }
+}
+
+// GetCommonType returns the common type of the given encoding type.
+func GetCommonType(et EncodeType) EncodeType {
+ switch et {
+ case EncodeTypeConstWithVersion:
+ return EncodeTypeConst
+ case EncodeTypeDeltaConstWithVersion:
+ return EncodeTypeDeltaConst
+ case EncodeTypeDeltaWithVersion:
+ return EncodeTypeDelta
+ case EncodeTypeDeltaOfDeltaWithVersion:
+ return EncodeTypeDeltaOfDelta
+ default:
+ return EncodeTypeUnknown
+ }
+}
diff --git a/pkg/encoding/int_list.go b/pkg/encoding/int_list.go
index 7ea736d9..82b842c5 100644
--- a/pkg/encoding/int_list.go
+++ b/pkg/encoding/int_list.go
@@ -55,7 +55,7 @@ func Int64ListToBytes(dst []byte, a []int64) (result []byte,
mt EncodeType, firs
// BytesToInt64List decodes bytes into a list of int64.
func BytesToInt64List(dst []int64, src []byte, mt EncodeType, firstValue
int64, itemsCount int) ([]int64, error) {
- dst = extendInt64ListCapacity(dst, itemsCount)
+ dst = ExtendInt64ListCapacity(dst, itemsCount)
var err error
switch mt {
@@ -100,7 +100,8 @@ func BytesToInt64List(dst []int64, src []byte, mt
EncodeType, firstValue int64,
}
}
-func extendInt64ListCapacity(dst []int64, additionalItems int) []int64 {
+// ExtendInt64ListCapacity extends the capacity of the int64 list.
+func ExtendInt64ListCapacity(dst []int64, additionalItems int) []int64 {
dstLen := len(dst)
if n := dstLen + additionalItems - cap(dst); n > 0 {
dst = append(dst[:cap(dst)], make([]int64, n)...)
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 4ff5e411..3aaf08a5 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -104,6 +104,7 @@ type Field struct {
// MeasureResult is the result of a query.
type MeasureResult struct {
Timestamps []int64
+ Versions []int64
TagFamilies []TagFamily
Fields []Field
SID common.SeriesID
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index a037813e..4079f79a 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -223,6 +223,7 @@ func (ei *resultMIterator) Next() bool {
for i := range r.Timestamps {
dp := &measurev1.DataPoint{
Timestamp: timestamppb.New(time.Unix(0,
r.Timestamps[i])),
+ Version: r.Versions[i],
}
for _, tf := range r.TagFamilies {
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 8405152f..347d6b99 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -75,9 +75,15 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext
helpers.SharedContext, args
innerGm.Expect(err).NotTo(gm.HaveOccurred())
want := &measurev1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
+ for i := range resp.DataPoints {
+ if resp.DataPoints[i].Timestamp != nil {
+
innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0))
+ }
+ }
innerGm.Expect(cmp.Equal(resp, want,
protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"),
+ protocmp.IgnoreFields(&measurev1.DataPoint{}, "version"),
protocmp.Transform())).
To(gm.BeTrue(), func() string {
j, err := protojson.Marshal(resp)
diff --git
a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
index dee1ddae..3f7a5f96 100644
---
a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
+++
b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
@@ -5,12 +5,12 @@
"tags": [
{
"str": {
- "value": "7"
+ "value": "12"
}
},
{
"str": {
- "value": "entity_2"
+ "value": "entity_1"
}
},
{
@@ -27,12 +27,12 @@
"fields": [
{
"int": {
- "value": 102
+ "value": 300
}
},
{
"int": {
- "value": 10
+ "value": 7
}
}
]
@@ -43,7 +43,7 @@
"tags": [
{
"str": {
- "value": "8"
+ "value": "7"
}
},
{
@@ -53,13 +53,11 @@
},
{
"str": {
- "value": "svc_1"
+ "value": ""
}
},
{
- "str": {
- "value": "GET:/metrics"
- }
+ "null": 0
}
]
}
@@ -67,12 +65,12 @@
"fields": [
{
"int": {
- "value": 103
+ "value": 102
}
},
{
"int": {
- "value": 9
+ "value": 10
}
}
]
@@ -83,7 +81,7 @@
"tags": [
{
"str": {
- "value": "9"
+ "value": "8"
}
},
{
@@ -93,12 +91,12 @@
},
{
"str": {
- "value": "svc_4"
+ "value": "svc_1"
}
},
{
"str": {
- "value": "GET:/check"
+ "value": "GET:/metrics"
}
}
]
@@ -107,12 +105,12 @@
"fields": [
{
"int": {
- "value": 130
+ "value": 103
}
},
{
"int": {
- "value": 8
+ "value": 9
}
}
]
@@ -123,17 +121,22 @@
"tags": [
{
"str": {
- "value": "10"
+ "value": "9"
}
},
{
"str": {
- "value": "entity_3"
+ "value": "entity_2"
}
},
{
"str": {
- "value": "svc_1"
+ "value": "svc_4"
+ }
+ },
+ {
+ "str": {
+ "value": "GET:/check"
}
}
]
@@ -142,12 +145,12 @@
"fields": [
{
"int": {
- "value": 133
+ "value": 130
}
},
{
"int": {
- "value": 11
+ "value": 8
}
}
]
@@ -158,17 +161,17 @@
"tags": [
{
"str": {
- "value": "11"
+ "value": "10"
}
},
{
"str": {
- "value": "entity_1"
+ "value": "entity_3"
}
},
{
"str": {
- "value": "svc_2"
+ "value": "svc_1"
}
}
]
@@ -177,12 +180,12 @@
"fields": [
{
"int": {
- "value": 54
+ "value": 133
}
},
{
"int": {
- "value": 12
+ "value": 11
}
}
]
@@ -193,7 +196,7 @@
"tags": [
{
"str": {
- "value": "12"
+ "value": "11"
}
},
{
@@ -203,11 +206,8 @@
},
{
"str": {
- "value": ""
+ "value": "svc_2"
}
- },
- {
- "null": 0
}
]
}
@@ -215,12 +215,12 @@
"fields": [
{
"int": {
- "value": 300
+ "value": 54
}
},
{
"int": {
- "value": 7
+ "value": 12
}
}
]