This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch measure-version in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3ae2456febdbcdda301f78bd36ad2afb2ddfb491 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Jun 12 16:36:50 2024 +0800 Add version to timestamp file Signed-off-by: Gao Hongtao <[email protected]> --- api/proto/banyandb/measure/v1/query.proto | 2 + api/proto/banyandb/measure/v1/write.proto | 2 + banyand/liaison/grpc/measure.go | 6 + banyand/measure/block.go | 78 ++- banyand/measure/block_metadata.go | 29 +- banyand/measure/block_metadata_test.go | 92 ++- banyand/measure/block_test.go | 88 +-- banyand/measure/block_writer.go | 4 +- banyand/measure/datapoints.go | 2 + banyand/measure/measure.go | 1 - banyand/measure/merger.go | 2 +- banyand/measure/merger_test.go | 11 +- banyand/measure/part.go | 4 +- banyand/measure/part_iter.go | 2 - banyand/measure/part_test.go | 3 + banyand/measure/query.go | 10 +- banyand/measure/query_test.go | 744 ++++++++++++++++++++- banyand/measure/tstable_test.go | 64 +- banyand/measure/write.go | 1 + banyand/stream/block.go | 2 +- docs/api-reference.md | 2 + pkg/encoding/encoding.go | 37 +- pkg/encoding/int_list.go | 5 +- pkg/pb/v1/metadata.go | 1 + .../measure/measure_plan_indexscan_local.go | 1 + test/cases/measure/data/data.go | 6 + ...service_instance_endpoint_cpm_minute_data1.json | 66 +- 27 files changed, 1097 insertions(+), 168 deletions(-) diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto index 6cb04417..49ef6a62 100644 --- a/api/proto/banyandb/measure/v1/query.proto +++ b/api/proto/banyandb/measure/v1/query.proto @@ -40,6 +40,8 @@ message DataPoint { } // fields contains fields selected in the projection repeated Field fields = 3; + // version is the version of the data point + int64 version = 4; } // QueryResponse is the response for a query to the Query module. diff --git a/api/proto/banyandb/measure/v1/write.proto b/api/proto/banyandb/measure/v1/write.proto index 3def9493..ea43dc59 100644 --- a/api/proto/banyandb/measure/v1/write.proto +++ b/api/proto/banyandb/measure/v1/write.proto @@ -36,6 +36,8 @@ message DataPointValue { repeated model.v1.TagFamilyForWrite tag_families = 2 [(validate.rules).repeated.min_items = 1]; // the order of fields match the measure schema repeated model.v1.FieldValue fields = 3; + // the version of the data point + int64 version = 4; } // WriteRequest is the request contract for write diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 13a8399b..6b480738 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -108,6 +108,12 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) continue } + if writeRequest.DataPoint.Version == 0 { + if writeRequest.MessageId == 0 { + writeRequest.MessageId = uint64(time.Now().UnixNano()) + } + writeRequest.DataPoint.Version = int64(writeRequest.MessageId) + } if ms.ingestionAccessLog != nil { if errAccessLog := ms.ingestionAccessLog.Write(writeRequest); errAccessLog != nil { ms.sampled.Error().Err(errAccessLog).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to write access log") diff --git a/banyand/measure/block.go b/banyand/measure/block.go index b4792d85..ec536df6 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -34,6 +34,8 @@ import ( type block struct { timestamps []int64 + versions []int64 + tagFamilies []columnFamily field columnFamily @@ -41,6 +43,7 @@ type block struct { func (b *block) reset() { b.timestamps = b.timestamps[:0] + b.versions = b.versions[:0] tff := b.tagFamilies for i := range tff { @@ -50,7 +53,7 @@ func (b *block) reset() { b.field.reset() } -func (b *block) mustInitFromDataPoints(timestamps []int64, tagFamilies [][]nameValues, fields []nameValues) { +func (b *block) mustInitFromDataPoints(timestamps []int64, versions []int64, tagFamilies [][]nameValues, fields []nameValues) { b.reset() size := len(timestamps) if size == 0 { @@ -65,13 +68,14 @@ func (b *block) mustInitFromDataPoints(timestamps []int64, tagFamilies [][]nameV assertTimestampsSorted(timestamps) b.timestamps = append(b.timestamps, timestamps...) + b.versions = append(b.versions, versions...) b.mustInitFromTagsAndFields(tagFamilies, fields) } func assertTimestampsSorted(timestamps []int64) { for i := range timestamps { if i > 0 && timestamps[i-1] > timestamps[i] { - logger.Panicf("log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d", + logger.Panicf("data points must be sorted by timestamp; got the previous data point with bigger timestamp %d than the current data point with timestamp %d", timestamps[i-1], timestamps[i]) } } @@ -136,7 +140,7 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, ww *writers) bm.uncompressedSizeBytes = b.uncompressedSizeBytes() bm.count = uint64(b.Len()) - mustWriteTimestampsTo(&bm.timestamps, b.timestamps, &ww.timestampsWriter) + mustWriteTimestampsTo(&bm.timestamps, b.timestamps, b.versions, &ww.timestampsWriter) for ti := range b.tagFamilies { b.marshalTagFamily(b.tagFamilies[ti], bm, ww) @@ -284,7 +288,7 @@ func (b *block) uncompressedSizeBytes() uint64 { func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm blockMetadata) { b.reset() - b.timestamps = mustReadTimestampsFrom(b.timestamps, &bm.timestamps, int(bm.count), p.timestamps) + b.timestamps, b.versions = mustReadTimestampsFrom(b.timestamps, b.versions, &bm.timestamps, int(bm.count), p.timestamps) cc := b.field.resizeColumns(len(bm.field.columnMetadata)) for i := range cc { @@ -307,7 +311,7 @@ func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm bl func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, seqReaders *seqReaders, bm blockMetadata) { b.reset() - b.timestamps = mustSeqReadTimestampsFrom(b.timestamps, &bm.timestamps, int(bm.count), &seqReaders.timestamps) + b.timestamps, b.versions = mustSeqReadTimestampsFrom(b.timestamps, b.versions, &bm.timestamps, int(bm.count), &seqReaders.timestamps) cc := b.field.resizeColumns(len(bm.field.columnMetadata)) for i := range cc { @@ -333,35 +337,35 @@ func (b *block) sortTagFamilies() { }) } -func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, timestampsWriter *writer) { +func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps, versions []int64, timestampsWriter *writer) { tm.reset() bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf, tm.encodeType, tm.min = encoding.Int64ListToBytes(bb.Buf[:0], timestamps) - if len(bb.Buf) > maxTimestampsBlockSize { - logger.Panicf("too big block with timestamps: %d bytes; the maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize) + tm.encodeType = encoding.GetVersionType(tm.encodeType) + if tm.encodeType == encoding.EncodeTypeUnknown { + logger.Panicf("unexpected encodeType %d", tm.encodeType) + return } tm.max = timestamps[len(timestamps)-1] tm.offset = timestampsWriter.bytesWritten - tm.size = uint64(len(bb.Buf)) + tm.versionOffset = uint64(len(bb.Buf)) + timestampsWriter.MustWrite(bb.Buf) + bb.Buf, tm.versionEncodeType, tm.versionFirst = encoding.Int64ListToBytes(bb.Buf[:0], versions) + tm.size = tm.versionOffset + uint64(len(bb.Buf)) timestampsWriter.MustWrite(bb.Buf) } -func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, reader fs.Reader) []int64 { +func mustReadTimestampsFrom(timestamps, versions []int64, tm *timestampsMetadata, count int, reader fs.Reader) ([]int64, []int64) { bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) fs.MustReadData(reader, int64(tm.offset), bb.Buf) - var err error - dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, tm.min, count) - if err != nil { - logger.Panicf("%s: cannot unmarshal timestamps: %v", reader.Path(), err) - } - return dst + return mustDecodeTimestampsWithVersions(timestamps, versions, tm, count, reader.Path(), bb.Buf) } -func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, reader *seqReader) []int64 { +func mustSeqReadTimestampsFrom(timestamps, versions []int64, tm *timestampsMetadata, count int, reader *seqReader) ([]int64, []int64) { if tm.offset != reader.bytesRead { logger.Panicf("offset %d must be equal to bytesRead %d", tm.offset, reader.bytesRead) } @@ -369,12 +373,31 @@ func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, r defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) reader.mustReadFull(bb.Buf) + return mustDecodeTimestampsWithVersions(timestamps, versions, tm, count, reader.Path(), bb.Buf) +} + +func mustDecodeTimestampsWithVersions(timestamps, versions []int64, tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []int64) { var err error - dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, tm.min, count) + if t := encoding.GetCommonType(tm.encodeType); t != encoding.EncodeTypeUnknown { + if tm.size < tm.versionOffset { + logger.Panicf("size %d must be greater than versionOffset %d", tm.size, tm.versionOffset) + } + timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.versionOffset], t, tm.min, count) + if err != nil { + logger.Panicf("%s: cannot unmarshal timestamps with versions: %v", path, err) + } + versions, err = encoding.BytesToInt64List(versions, src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count) + if err != nil { + logger.Panicf("%s: cannot unmarshal versions: %v", path, err) + } + return timestamps, versions + } + timestamps, err = encoding.BytesToInt64List(timestamps, src, tm.encodeType, tm.min, count) if err != nil { - logger.Panicf("%s: cannot unmarshal timestamps: %v", reader.Path(), err) + logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err) } - return dst + versions = encoding.ExtendInt64ListCapacity(versions, count) + return timestamps, versions } func generateBlock() *block { @@ -396,6 +419,7 @@ type blockCursor struct { p *part fields columnFamily timestamps []int64 + versions []int64 tagFamilies []columnFamily columnValuesDecoder encoding.BytesBlockDecoder tagProjection []pbv1.TagProjection @@ -416,6 +440,7 @@ func (bc *blockCursor) reset() { bc.fieldProjection = bc.fieldProjection[:0] bc.timestamps = bc.timestamps[:0] + bc.versions = bc.versions[:0] tff := bc.tagFamilies for i := range tff { @@ -452,6 +477,7 @@ func (bc *blockCursor) copyAllTo(r *pbv1.MeasureResult, entityValuesAll map[comm size := offset - idx r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...) + r.Versions = append(r.Versions, bc.versions[idx:offset]...) var entityValues map[string]*modelv1.TagValue if entityValuesAll != nil { entityValues = entityValuesAll[r.SID] @@ -532,6 +558,7 @@ func (bc *blockCursor) copyTo(r *pbv1.MeasureResult, entityValuesAll map[common. ) { r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx]) + r.Versions = append(r.Versions, bc.versions[bc.idx]) var entityValues map[string]*modelv1.TagValue if entityValuesAll != nil { entityValues = entityValuesAll[r.SID] @@ -629,6 +656,7 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { return false } bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[start:end+1]...) + bc.versions = append(bc.versions, tmpBlock.versions[start:end+1]...) for _, cf := range tmpBlock.tagFamilies { tf := columnFamily{ @@ -687,9 +715,8 @@ func releaseBlockCursor(bc *blockCursor) { type blockPointer struct { block - bm blockMetadata - idx int - lastPartID uint64 + bm blockMetadata + idx int } func (bi *blockPointer) updateMetadata() { @@ -770,8 +797,8 @@ func (bi *blockPointer) append(b *blockPointer, offset int) { assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset) bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...) - - bi.lastPartID = b.lastPartID + assertIdxAndOffset("versions", len(b.versions), bi.idx, offset) + bi.versions = append(bi.versions, b.versions[b.idx:offset]...) } func assertIdxAndOffset(name string, length int, idx int, offset int) { @@ -789,7 +816,6 @@ func (bi *blockPointer) isFull() bool { func (bi *blockPointer) reset() { bi.idx = 0 - bi.lastPartID = 0 bi.block.reset() bi.bm = blockMetadata{} } diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index 6a066329..b58a0e4d 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -236,9 +236,12 @@ func releaseBlockMetadataArray(bma *blockMetadataArray) { type timestampsMetadata struct { dataBlock - min int64 - max int64 - encodeType encoding.EncodeType + min int64 + max int64 + versionOffset uint64 + versionFirst int64 + encodeType encoding.EncodeType + versionEncodeType encoding.EncodeType } func (tm *timestampsMetadata) reset() { @@ -246,6 +249,9 @@ func (tm *timestampsMetadata) reset() { tm.min = 0 tm.max = 0 tm.encodeType = 0 + tm.versionOffset = 0 + tm.versionFirst = 0 + tm.versionEncodeType = 0 } func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) { @@ -253,6 +259,9 @@ func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) { tm.min = src.min tm.max = src.max tm.encodeType = src.encodeType + tm.versionOffset = src.versionOffset + tm.versionFirst = src.versionFirst + tm.versionEncodeType = src.versionEncodeType } func (tm *timestampsMetadata) marshal(dst []byte) []byte { @@ -260,19 +269,31 @@ func (tm *timestampsMetadata) marshal(dst []byte) []byte { dst = encoding.Uint64ToBytes(dst, uint64(tm.min)) dst = encoding.Uint64ToBytes(dst, uint64(tm.max)) dst = append(dst, byte(tm.encodeType)) + dst = encoding.VarUint64ToBytes(dst, tm.versionOffset) + dst = encoding.Uint64ToBytes(dst, uint64(tm.versionFirst)) + dst = append(dst, byte(tm.versionEncodeType)) return dst } func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) { src, err := tm.dataBlock.unmarshal(src) if err != nil { - return nil, fmt.Errorf("cannot unmarshal dataBlock: %w", err) + return nil, fmt.Errorf("cannot unmarshal ts dataBlock: %w", err) } tm.min = int64(encoding.BytesToUint64(src)) src = src[8:] tm.max = int64(encoding.BytesToUint64(src)) src = src[8:] tm.encodeType = encoding.EncodeType(src[0]) + src = src[1:] + src, n, err := encoding.BytesToVarUint64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal ts offset: %w", err) + } + tm.versionOffset = n + tm.versionFirst = int64(encoding.BytesToUint64(src)) + src = src[8:] + tm.versionEncodeType = encoding.EncodeType(src[0]) return src[1:], nil } diff --git a/banyand/measure/block_metadata_test.go b/banyand/measure/block_metadata_test.go index 59709b73..6bac5d09 100644 --- a/banyand/measure/block_metadata_test.go +++ b/banyand/measure/block_metadata_test.go @@ -80,9 +80,12 @@ func Test_timestampsMetadata_reset(t *testing.T) { offset: 1, size: 1, }, - min: 1, - max: 1, - encodeType: encoding.EncodeTypeConst, + min: 1, + max: 1, + encodeType: encoding.EncodeTypeConst, + versionOffset: 1, + versionFirst: 1, + versionEncodeType: encoding.EncodeTypeDelta, } tm.reset() @@ -92,6 +95,9 @@ func Test_timestampsMetadata_reset(t *testing.T) { assert.Equal(t, int64(0), tm.min) assert.Equal(t, int64(0), tm.max) assert.Equal(t, encoding.EncodeTypeUnknown, tm.encodeType) + assert.Equal(t, uint64(0), tm.versionOffset) + assert.Equal(t, int64(0), tm.versionFirst) + assert.Equal(t, encoding.EncodeTypeUnknown, tm.versionEncodeType) } func Test_timestampsMetadata_copyFrom(t *testing.T) { @@ -100,9 +106,12 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) { offset: 1, size: 1, }, - min: 1, - max: 1, - encodeType: encoding.EncodeTypeConst, + min: 1, + max: 1, + encodeType: encoding.EncodeTypeConst, + versionOffset: 1, + versionFirst: 1, + versionEncodeType: encoding.EncodeTypeDelta, } dest := ×tampsMetadata{ @@ -110,9 +119,12 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) { offset: 2, size: 2, }, - min: 2, - max: 2, - encodeType: encoding.EncodeTypeDelta, + min: 2, + max: 2, + encodeType: encoding.EncodeTypeDelta, + versionOffset: 2, + versionFirst: 2, + versionEncodeType: encoding.EncodeTypeDeltaOfDelta, } dest.copyFrom(src) @@ -122,6 +134,9 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) { assert.Equal(t, src.min, dest.min) assert.Equal(t, src.max, dest.max) assert.Equal(t, src.encodeType, dest.encodeType) + assert.Equal(t, src.versionOffset, dest.versionOffset) + assert.Equal(t, src.versionFirst, dest.versionFirst) + assert.Equal(t, src.versionEncodeType, dest.versionEncodeType) } func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) { @@ -130,9 +145,12 @@ func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) { offset: 1, size: 1, }, - min: 1, - max: 1, - encodeType: encoding.EncodeTypeConst, + min: 1, + max: 1, + encodeType: encoding.EncodeTypeConst, + versionOffset: 1, + versionFirst: 1, + versionEncodeType: encoding.EncodeTypeDelta, } marshaled := original.marshal(nil) @@ -147,6 +165,9 @@ func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) { assert.Equal(t, original.min, unmarshaled.min) assert.Equal(t, original.max, unmarshaled.max) assert.Equal(t, original.encodeType, unmarshaled.encodeType) + assert.Equal(t, original.versionOffset, unmarshaled.versionOffset) + assert.Equal(t, original.versionFirst, unmarshaled.versionFirst) + assert.Equal(t, original.versionEncodeType, unmarshaled.versionEncodeType) } func Test_blockMetadata_marshal_unmarshal(t *testing.T) { @@ -200,6 +221,44 @@ func Test_blockMetadata_marshal_unmarshal(t *testing.T) { }, }, }, + { + name: "Non-zero values with versions", + original: &blockMetadata{ + seriesID: common.SeriesID(1), + uncompressedSizeBytes: 1, + count: 1, + timestamps: timestampsMetadata{ + dataBlock: dataBlock{ + offset: 1, + size: 1, + }, + min: 1, + max: 1, + encodeType: encoding.EncodeTypeConst, + versionOffset: 1, + versionFirst: 1, + versionEncodeType: encoding.EncodeTypeDelta, + }, + tagFamilies: map[string]*dataBlock{ + "tag1": { + offset: 1, + size: 1, + }, + }, + field: columnFamilyMetadata{ + columnMetadata: []columnMetadata{ + { + dataBlock: dataBlock{ + offset: 1, + size: 1, + }, + name: "field1", + valueType: pbv1.ValueTypeInt64, + }, + }, + }, + }, + }, { name: "Multiple tagFamilies and columnMetadata", original: &blockMetadata{ @@ -211,9 +270,12 @@ func Test_blockMetadata_marshal_unmarshal(t *testing.T) { offset: 2, size: 2, }, - min: 2, - max: 2, - encodeType: encoding.EncodeTypeConst, + min: 2, + max: 2, + encodeType: encoding.EncodeTypeConst, + versionOffset: 2, + versionFirst: 2, + versionEncodeType: encoding.EncodeTypeDelta, }, tagFamilies: map[string]*dataBlock{ "tag1": { diff --git a/banyand/measure/block_test.go b/banyand/measure/block_test.go index 53b72f8b..fdd2e350 100644 --- a/banyand/measure/block_test.go +++ b/banyand/measure/block_test.go @@ -18,8 +18,6 @@ package measure import ( - "crypto/rand" - "encoding/binary" "reflect" "testing" @@ -36,6 +34,7 @@ import ( func Test_block_reset(t *testing.T) { type fields struct { timestamps []int64 + versions []int64 tagFamilies []columnFamily field columnFamily } @@ -48,11 +47,13 @@ func Test_block_reset(t *testing.T) { name: "Test reset", fields: fields{ timestamps: []int64{1, 2, 3}, + versions: []int64{1, 2, 3}, tagFamilies: []columnFamily{{}, {}, {}}, field: columnFamily{columns: []column{{}, {}}}, }, want: block{ timestamps: []int64{}, + versions: []int64{}, tagFamilies: []columnFamily{}, field: columnFamily{columns: []column{}}, }, @@ -62,6 +63,7 @@ func Test_block_reset(t *testing.T) { t.Run(tt.name, func(t *testing.T) { b := &block{ timestamps: tt.fields.timestamps, + versions: tt.fields.versions, tagFamilies: tt.fields.tagFamilies, field: tt.fields.field, } @@ -87,6 +89,7 @@ func toTagProjection(b block) map[string][]string { var conventionalBlock = block{ timestamps: []int64{1, 2}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -131,6 +134,7 @@ var conventionalBlock = block{ func Test_block_mustInitFromDataPoints(t *testing.T) { type args struct { timestamps []int64 + versions []int64 tagFamilies [][]nameValues fields []nameValues } @@ -143,6 +147,7 @@ func Test_block_mustInitFromDataPoints(t *testing.T) { name: "Test mustInitFromDataPoints", args: args{ timestamps: []int64{1, 2}, + versions: []int64{1, 1}, tagFamilies: [][]nameValues{ { { @@ -208,7 +213,7 @@ func Test_block_mustInitFromDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { b := &block{} - b.mustInitFromDataPoints(tt.args.timestamps, tt.args.tagFamilies, tt.args.fields) + b.mustInitFromDataPoints(tt.args.timestamps, tt.args.versions, tt.args.tagFamilies, tt.args.fields) if !reflect.DeepEqual(*b, tt.want) { t.Errorf("block.mustInitFromDataPoints() = %+v, want %+v", *b, tt.want) } @@ -237,21 +242,31 @@ func marshalIntArr(arr [][]byte) []byte { } func Test_mustWriteAndReadTimestamps(t *testing.T) { + type args struct { + timestamp []int64 + versions []int64 + } tests := []struct { name string - args []int64 + args args wantPanic bool wantTM timestampsMetadata }{ { - name: "Test mustWriteAndReadTimestamps", - args: []int64{1, 2, 3, 4, 5}, + name: "Test mustWriteAndReadTimestamps", + args: args{ + timestamp: []int64{1, 2, 3, 4, 5}, + versions: []int64{1, 2, 3, 4, 5}, + }, wantPanic: false, }, { - name: "Test mustWriteAndReadTimestamps with panic", - args: getBitInt64Arr(), - wantPanic: true, + name: "Test mustWriteAndReadTimestamps with the same version", + args: args{ + timestamp: []int64{1, 2, 3, 4, 5}, + versions: []int64{0, 0, 0, 0, 0}, + }, + wantPanic: false, }, } for _, tt := range tests { @@ -266,26 +281,18 @@ func Test_mustWriteAndReadTimestamps(t *testing.T) { b := &bytes.Buffer{} w := new(writer) w.init(b) - mustWriteTimestampsTo(tm, tt.args, w) - timestamps := mustReadTimestampsFrom(nil, tm, len(tt.args), b) - if !reflect.DeepEqual(timestamps, tt.args) { + mustWriteTimestampsTo(tm, tt.args.timestamp, tt.args.versions, w) + timestamps, versions := mustReadTimestampsFrom(nil, nil, tm, len(tt.args.timestamp), b) + if !reflect.DeepEqual(timestamps, tt.args.timestamp) { t.Errorf("mustReadTimestampsFrom() = %v, want %v", timestamps, tt.args) } + if !reflect.DeepEqual(versions, tt.args.versions) { + t.Errorf("mustReadTimestampsFrom() = %v, want %v", versions, tt.args) + } }) } } -func getBitInt64Arr() []int64 { - size := maxTimestampsBlockSize + 1 - randSlice := make([]int64, size) - for i := range randSlice { - b := make([]byte, 8) - _, _ = rand.Read(b) - randSlice[i] = int64(binary.BigEndian.Uint64(b)) - } - return randSlice -} - func Test_marshalAndUnmarshalTagFamily(t *testing.T) { metaBuffer, dataBuffer := &bytes.Buffer{}, &bytes.Buffer{} ww := &writers{ @@ -420,6 +427,7 @@ func Test_marshalAndUnmarshalBlock(t *testing.T) { func Test_blockPointer_append(t *testing.T) { type fields struct { timestamps []int64 + versions []int64 tagFamilies []columnFamily field columnFamily partID uint64 @@ -439,6 +447,7 @@ func Test_blockPointer_append(t *testing.T) { name: "Test append with empty block", fields: fields{ timestamps: []int64{1, 2}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -460,6 +469,7 @@ func Test_blockPointer_append(t *testing.T) { b: &blockPointer{ block: block{ timestamps: []int64{}, + versions: []int64{}, tagFamilies: []columnFamily{}, field: columnFamily{}, }, @@ -470,6 +480,7 @@ func Test_blockPointer_append(t *testing.T) { want: &blockPointer{ block: block{ timestamps: []int64{1, 2}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -494,6 +505,7 @@ func Test_blockPointer_append(t *testing.T) { name: "Test append to a empty block", fields: fields{ timestamps: nil, + versions: nil, tagFamilies: nil, field: columnFamily{ columns: nil, @@ -502,9 +514,9 @@ func Test_blockPointer_append(t *testing.T) { }, args: args{ b: &blockPointer{ - lastPartID: 2, block: block{ timestamps: []int64{4, 5}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -527,9 +539,9 @@ func Test_blockPointer_append(t *testing.T) { offset: 2, }, want: &blockPointer{ - lastPartID: 2, block: block{ timestamps: []int64{4, 5}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -554,6 +566,7 @@ func Test_blockPointer_append(t *testing.T) { name: "Test append with offset equals to the data size. All data", fields: fields{ timestamps: []int64{1, 2}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -574,9 +587,9 @@ func Test_blockPointer_append(t *testing.T) { }, args: args{ b: &blockPointer{ - lastPartID: 2, block: block{ timestamps: []int64{4, 5}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -599,9 +612,9 @@ func Test_blockPointer_append(t *testing.T) { offset: 2, }, want: &blockPointer{ - lastPartID: 2, block: block{ timestamps: []int64{1, 2, 4, 5}, + versions: []int64{1, 1, 1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -630,6 +643,7 @@ func Test_blockPointer_append(t *testing.T) { name: "Test append with non-empty block and offset less than timestamps", fields: fields{ timestamps: []int64{1, 2}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -650,9 +664,9 @@ func Test_blockPointer_append(t *testing.T) { }, args: args{ b: &blockPointer{ - lastPartID: 2, block: block{ timestamps: []int64{4, 5}, + versions: []int64{1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -675,9 +689,9 @@ func Test_blockPointer_append(t *testing.T) { offset: 1, }, want: &blockPointer{ - lastPartID: 2, block: block{ timestamps: []int64{1, 2, 4}, + versions: []int64{1, 1, 1}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -713,10 +727,10 @@ func Test_blockPointer_append(t *testing.T) { bi := &blockPointer{ block: block{ timestamps: tt.fields.timestamps, + versions: tt.fields.versions, tagFamilies: tt.fields.tagFamilies, field: tt.fields.field, }, - lastPartID: tt.fields.partID, } bi.append(tt.args.b, tt.args.offset) if !reflect.DeepEqual(bi, tt.want) { @@ -748,17 +762,15 @@ func Test_blockPointer_copyFrom(t *testing.T) { }, args: args{ src: &blockPointer{ - bm: blockMetadata{count: 1}, - idx: 0, - block: conventionalBlock, - lastPartID: 2, + bm: blockMetadata{count: 1}, + idx: 0, + block: conventionalBlock, }, }, want: &blockPointer{ - bm: blockMetadata{count: 1}, - idx: 0, - block: conventionalBlock, - lastPartID: 2, + bm: blockMetadata{count: 1}, + idx: 0, + block: conventionalBlock, }, }, } diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go index 760a63ad..fd065f1a 100644 --- a/banyand/measure/block_writer.go +++ b/banyand/measure/block_writer.go @@ -186,14 +186,14 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path string bw.writers.fieldValuesWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, fieldValuesFilename), filePermission)) } -func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps []int64, tagFamilies [][]nameValues, fields []nameValues) { +func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps, versions []int64, tagFamilies [][]nameValues, fields []nameValues) { if len(timestamps) == 0 { return } b := generateBlock() defer releaseBlock(b) - b.mustInitFromDataPoints(timestamps, tagFamilies, fields) + b.mustInitFromDataPoints(timestamps, versions, tagFamilies, fields) bw.mustWriteBlock(sid, b) } diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go index 5cf2d9a9..3996e3a0 100644 --- a/banyand/measure/datapoints.go +++ b/banyand/measure/datapoints.go @@ -118,6 +118,7 @@ type nameValues struct { type dataPoints struct { seriesIDs []common.SeriesID timestamps []int64 + versions []int64 tagFamilies [][]nameValues fields []nameValues } @@ -136,6 +137,7 @@ func (d *dataPoints) Less(i, j int) bool { func (d *dataPoints) Swap(i, j int) { d.seriesIDs[i], d.seriesIDs[j] = d.seriesIDs[j], d.seriesIDs[i] d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i] + d.versions[i], d.versions[j] = d.versions[j], d.versions[i] d.tagFamilies[i], d.tagFamilies[j] = d.tagFamilies[j], d.tagFamilies[i] d.fields[i], d.fields[j] = d.fields[j], d.fields[i] } diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index 43515964..c05c4c99 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -35,7 +35,6 @@ import ( const ( maxValuesBlockSize = 8 * 1024 * 1024 - maxTimestampsBlockSize = 8 * 1024 * 1024 maxTagFamiliesMetadataSize = 8 * 1024 * 1024 maxUncompressedBlockSize = 2 * 1024 * 1024 maxUncompressedPrimaryBlockSize = 128 * 1024 diff --git a/banyand/measure/merger.go b/banyand/measure/merger.go index e957142c..85869691 100644 --- a/banyand/measure/merger.go +++ b/banyand/measure/merger.go @@ -370,7 +370,7 @@ func mergeTwoBlocks(target, left, right *blockPointer) { i++ } if left.timestamps[i-1] == ts2 { - if left.lastPartID >= right.lastPartID { + if left.versions[i-1] >= right.versions[right.idx] { target.append(left, i) } else { target.append(left, i-1) // skip left diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go index 0ad12e27..c8de11bd 100644 --- a/banyand/measure/merger_test.go +++ b/banyand/measure/merger_test.go @@ -61,6 +61,7 @@ func Test_mergeTwoBlocks(t *testing.T) { left: &blockPointer{ block: block{ timestamps: []int64{1, 2}, + versions: []int64{1, 4}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -82,6 +83,7 @@ func Test_mergeTwoBlocks(t *testing.T) { right: &blockPointer{ block: block{ timestamps: []int64{3, 4}, + versions: []int64{5, 6}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -107,6 +109,7 @@ func Test_mergeTwoBlocks(t *testing.T) { left: &blockPointer{ block: block{ timestamps: []int64{1, 3}, + versions: []int64{1, 5}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -128,6 +131,7 @@ func Test_mergeTwoBlocks(t *testing.T) { right: &blockPointer{ block: block{ timestamps: []int64{2, 4}, + versions: []int64{4, 6}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -151,9 +155,9 @@ func Test_mergeTwoBlocks(t *testing.T) { { name: "Merge two non-empty blocks with duplicated timestamps", left: &blockPointer{ - lastPartID: 1, // the less partID will be skipped block: block{ timestamps: []int64{1, 2, 3}, + versions: []int64{1, 2, 3}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -177,9 +181,9 @@ func Test_mergeTwoBlocks(t *testing.T) { }, }, right: &blockPointer{ - lastPartID: 2, // the greater partID will be appended block: block{ timestamps: []int64{2, 3, 4}, + versions: []int64{4, 5, 6}, tagFamilies: []columnFamily{ { name: "arrTag", @@ -201,7 +205,7 @@ func Test_mergeTwoBlocks(t *testing.T) { }, }, }, - want: &blockPointer{block: mergedBlock, lastPartID: 2, bm: blockMetadata{timestamps: timestampsMetadata{min: 1, max: 4}}}, + want: &blockPointer{block: mergedBlock, bm: blockMetadata{timestamps: timestampsMetadata{min: 1, max: 4}}}, }, } @@ -218,6 +222,7 @@ func Test_mergeTwoBlocks(t *testing.T) { var mergedBlock = block{ timestamps: []int64{1, 2, 3, 4}, + versions: []int64{1, 4, 5, 6}, tagFamilies: []columnFamily{ { name: "arrTag", diff --git a/banyand/measure/part.go b/banyand/measure/part.go index 20b6a5a7..c6383814 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -158,14 +158,14 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || (i-indexPrev) > maxBlockLength || sid != sidPrev { - bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i]) + bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i]) sidPrev = sid indexPrev = i uncompressedBlockSizeBytes = 0 } uncompressedBlockSizeBytes += uncompressedDataPointSizeBytes(i, dps) } - bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:], dps.tagFamilies[indexPrev:], dps.fields[indexPrev:]) + bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:], dps.versions[indexPrev:], dps.tagFamilies[indexPrev:], dps.fields[indexPrev:]) bsw.Flush(&mp.partMetadata) releaseBlockWriter(bsw) } diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go index aef39977..0c916334 100644 --- a/banyand/measure/part_iter.go +++ b/banyand/measure/part_iter.go @@ -315,8 +315,6 @@ func (pmi *partMergeIter) loadBlockMetadata() error { pm := pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1] return fmt.Errorf("can't read block metadata from primary at %d: %w", pm.offset, err) } - - pmi.block.lastPartID = pmi.partID return nil } diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go index 1044a46c..e06b3e04 100644 --- a/banyand/measure/part_test.go +++ b/banyand/measure/part_test.go @@ -40,6 +40,7 @@ func TestMustInitFromDataPoints(t *testing.T) { name: "Test with empty dataPoints", dps: &dataPoints{ timestamps: []int64{}, + versions: []int64{}, seriesIDs: []common.SeriesID{}, tagFamilies: make([][]nameValues, 0), fields: make([]nameValues, 0), @@ -50,6 +51,7 @@ func TestMustInitFromDataPoints(t *testing.T) { name: "Test with one item in dataPoints", dps: &dataPoints{ timestamps: []int64{1}, + versions: []int64{1}, seriesIDs: []common.SeriesID{1}, tagFamilies: [][]nameValues{ { @@ -126,6 +128,7 @@ func TestMustInitFromDataPoints(t *testing.T) { var dps = &dataPoints{ seriesIDs: []common.SeriesID{1, 1, 2, 2, 3, 3}, timestamps: []int64{1, 2, 8, 10, 100, 220}, + versions: []int64{1, 2, 3, 4, 5, 6}, tagFamilies: [][]nameValues{ { { diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 6c9ae474..168a5663 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -438,8 +438,8 @@ func (qr queryResult) Len() int { func (qr queryResult) Less(i, j int) bool { leftTS := qr.data[i].timestamps[qr.data[i].idx] rightTS := qr.data[j].timestamps[qr.data[j].idx] - leftVersion := qr.data[i].p.partMetadata.ID - rightVersion := qr.data[j].p.partMetadata.ID + leftVersion := qr.data[i].versions[qr.data[i].idx] + rightVersion := qr.data[j].versions[qr.data[j].idx] if qr.orderByTS { if leftTS == rightTS { if qr.data[i].bm.seriesID == qr.data[j].bm.seriesID { @@ -495,7 +495,7 @@ func (qr *queryResult) merge(entityValuesAll map[common.SeriesID]map[string]*mod step = -1 } result := &pbv1.MeasureResult{} - var lastPartVersion uint64 + var lastVersion int64 var lastSid common.SeriesID for qr.Len() > 0 { @@ -507,12 +507,12 @@ func (qr *queryResult) merge(entityValuesAll map[common.SeriesID]map[string]*mod if len(result.Timestamps) > 0 && topBC.timestamps[topBC.idx] == result.Timestamps[len(result.Timestamps)-1] { - if topBC.p.partMetadata.ID > lastPartVersion { + if topBC.versions[topBC.idx] > lastVersion { logger.Panicf("following parts version should be less or equal to the previous one") } } else { topBC.copyTo(result, entityValuesAll, tagProjection) - lastPartVersion = topBC.p.partMetadata.ID + lastVersion = topBC.versions[topBC.idx] } topBC.idx += step diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 56111323..e43dd5fc 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -59,6 +59,7 @@ func TestQueryResult(t *testing.T) { want: []pbv1.MeasureResult{{ SID: 1, Timestamps: []int64{1}, + Versions: []int64{1}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, @@ -83,6 +84,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 2, Timestamps: []int64{1}, + Versions: []int64{2}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -102,6 +104,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 3, Timestamps: []int64{1}, + Versions: []int64{3}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -123,38 +126,40 @@ func TestQueryResult(t *testing.T) { }}, }, { - name: "Test with multiple parts with multiple data orderBy TS desc", - dpsList: []*dataPoints{dpsTS1, dpsTS2}, + name: "Test with multiple parts with duplicated data with different version order by TS 1", + dpsList: []*dataPoints{dpsTS1, dpsTS11}, sids: []common.SeriesID{1, 2, 3}, minTimestamp: 1, - maxTimestamp: 2, + maxTimestamp: 1, want: []pbv1.MeasureResult{{ SID: 1, - Timestamps: []int64{2}, + Timestamps: []int64{1}, + Versions: []int64{1}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}}, + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, }}, {Name: "binaryTag", Tags: []pbv1.Tag{ {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, }}, {Name: "singleTag", Tags: []pbv1.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ - {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field3")}}, - {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(3330)}}, - {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(3663699.029)}}, + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, }, }, { SID: 2, - Timestamps: []int64{2}, + Timestamps: []int64{1}, + Versions: []int64{2}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -166,14 +171,15 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3")}}, - {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4")}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, }}, }, Fields: nil, }, { SID: 3, - Timestamps: []int64{2}, + Timestamps: []int64{1}, + Versions: []int64{3}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -190,11 +196,20 @@ func TestQueryResult(t *testing.T) { }}, }, Fields: []pbv1.Field{ - {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(4440)}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, }, - }, { + }}, + }, + { + name: "Test with multiple parts with duplicated data with different version order by TS 2", + dpsList: []*dataPoints{dpsTS11, dpsTS1}, + sids: []common.SeriesID{1, 2, 3}, + minTimestamp: 1, + maxTimestamp: 1, + want: []pbv1.MeasureResult{{ SID: 1, Timestamps: []int64{1}, + Versions: []int64{1}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, @@ -213,12 +228,13 @@ func TestQueryResult(t *testing.T) { Fields: []pbv1.Field{ {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, - {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1221233.343)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, }, }, { SID: 2, Timestamps: []int64{1}, + Versions: []int64{2}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -238,6 +254,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 3, Timestamps: []int64{1}, + Versions: []int64{3}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -259,15 +276,82 @@ func TestQueryResult(t *testing.T) { }}, }, { - name: "Test with multiple parts with multiple data orderBy TS asc", + name: "Test with multiple parts with multiple data orderBy TS desc 1", dpsList: []*dataPoints{dpsTS1, dpsTS2}, sids: []common.SeriesID{1, 2, 3}, - ascTS: true, minTimestamp: 1, maxTimestamp: 2, want: []pbv1.MeasureResult{{ + SID: 1, + Timestamps: []int64{2}, + Versions: []int64{4}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field3")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(3330)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(3663699.029)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{2}, + Versions: []int64{5}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{2}, + Versions: []int64{6}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(4440)}}, + }, + }, { SID: 1, Timestamps: []int64{1}, + Versions: []int64{1}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, @@ -292,6 +376,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 2, Timestamps: []int64{1}, + Versions: []int64{2}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -311,6 +396,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 3, Timestamps: []int64{1}, + Versions: []int64{3}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -329,9 +415,18 @@ func TestQueryResult(t *testing.T) { Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, }, - }, { + }}, + }, + { + name: "Test with multiple parts with multiple data orderBy TS desc 2", + dpsList: []*dataPoints{dpsTS2, dpsTS1}, + sids: []common.SeriesID{1, 2, 3}, + minTimestamp: 1, + maxTimestamp: 2, + want: []pbv1.MeasureResult{{ SID: 1, Timestamps: []int64{2}, + Versions: []int64{4}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}}, @@ -356,6 +451,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 2, Timestamps: []int64{2}, + Versions: []int64{5}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -375,6 +471,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 3, Timestamps: []int64{2}, + Versions: []int64{6}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -393,18 +490,86 @@ func TestQueryResult(t *testing.T) { Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(4440)}}, }, + }, { + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1221233.343)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{1}, + Versions: []int64{2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{1}, + Versions: []int64{3}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + }, }}, }, { - name: "Test with multiple parts with duplicated data order by Series", - dpsList: []*dataPoints{dpsTS1, dpsTS1}, - sids: []common.SeriesID{1, 2, 3}, - orderBySeries: true, - minTimestamp: 1, - maxTimestamp: 1, + name: "Test with multiple parts with multiple data orderBy TS asc 1", + dpsList: []*dataPoints{dpsTS1, dpsTS2}, + sids: []common.SeriesID{1, 2, 3}, + ascTS: true, + minTimestamp: 1, + maxTimestamp: 2, want: []pbv1.MeasureResult{{ SID: 1, Timestamps: []int64{1}, + Versions: []int64{1}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, @@ -423,12 +588,13 @@ func TestQueryResult(t *testing.T) { Fields: []pbv1.Field{ {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, - {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1221233.343)}}, {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, }, }, { SID: 2, Timestamps: []int64{1}, + Versions: []int64{2}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -448,6 +614,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 3, Timestamps: []int64{1}, + Versions: []int64{3}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, @@ -466,11 +633,525 @@ func TestQueryResult(t *testing.T) { Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, }, + }, { + SID: 1, + Timestamps: []int64{2}, + Versions: []int64{4}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field3")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(3330)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(3663699.029)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{2}, + Versions: []int64{5}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{2}, + Versions: []int64{6}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(4440)}}, + }, }}, }, { - name: "Test with multiple parts with multiple data order by Series", - dpsList: []*dataPoints{dpsTS1, dpsTS2}, + name: "Test with multiple parts with multiple data orderBy TS asc 2", + dpsList: []*dataPoints{dpsTS2, dpsTS1}, + sids: []common.SeriesID{1, 2, 3}, + ascTS: true, + minTimestamp: 1, + maxTimestamp: 2, + want: []pbv1.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1221233.343)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{1}, + Versions: []int64{2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{1}, + Versions: []int64{3}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + }, + }, { + SID: 1, + Timestamps: []int64{2}, + Versions: []int64{4}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field3")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(3330)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(3663699.029)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{2}, + Versions: []int64{5}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{2}, + Versions: []int64{6}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(4440)}}, + }, + }}, + }, + { + name: "Test with multiple parts with duplicated data order by Series", + dpsList: []*dataPoints{dpsTS1, dpsTS1}, + sids: []common.SeriesID{1, 2, 3}, + orderBySeries: true, + minTimestamp: 1, + maxTimestamp: 1, + want: []pbv1.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{1}, + Versions: []int64{2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{1}, + Versions: []int64{3}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + }, + }}, + }, + { + name: "Test with multiple parts with duplicated data with different versions order by Series 1", + dpsList: []*dataPoints{dpsTS11, dpsTS1}, + sids: []common.SeriesID{1, 2, 3}, + orderBySeries: true, + minTimestamp: 1, + maxTimestamp: 1, + want: []pbv1.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{1}, + Versions: []int64{2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{1}, + Versions: []int64{3}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + }, + }}, + }, + { + name: "Test with multiple parts with duplicated data with different versions order by Series 2", + dpsList: []*dataPoints{dpsTS1, dpsTS11}, + sids: []common.SeriesID{1, 2, 3}, + orderBySeries: true, + minTimestamp: 1, + maxTimestamp: 1, + want: []pbv1.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }, { + SID: 2, + Timestamps: []int64{1}, + Versions: []int64{2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, + }}, + }, + Fields: nil, + }, { + SID: 3, + Timestamps: []int64{1}, + Versions: []int64{3}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + }, + }}, + }, + { + name: "Test with multiple parts with multiple data order by Series 1", + dpsList: []*dataPoints{dpsTS1, dpsTS2}, + sids: []common.SeriesID{2, 1, 3}, + orderBySeries: true, + minTimestamp: 1, + maxTimestamp: 2, + want: []pbv1.MeasureResult{{ + SID: 2, + Timestamps: []int64{1, 2}, + Versions: []int64{2, 5}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag3")}}, + {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag4")}}, + }}, + }, + Fields: nil, + }, { + SID: 1, + Timestamps: []int64{1, 2}, + Versions: []int64{1, 4}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}), strArrTagValue([]string{"value5", "value6"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30}), int64ArrTagValue([]int64{35, 40})}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText), binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1"), strTagValue("value3")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10), int64TagValue(30)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1"), strFieldValue("field3")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110), int64FieldValue(3330)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06), float64FieldValue(3663699.029)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText), binaryDataFieldValue(longText)}}, + }, + }, { + SID: 3, + Timestamps: []int64{1, 2}, + Versions: []int64{3, 6}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + }, + Fields: []pbv1.Field{ + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110), int64FieldValue(4440)}}, + }, + }}, + }, + { + name: "Test with multiple parts with multiple data order by Series 2", + dpsList: []*dataPoints{dpsTS2, dpsTS1}, sids: []common.SeriesID{2, 1, 3}, orderBySeries: true, minTimestamp: 1, @@ -478,6 +1159,7 @@ func TestQueryResult(t *testing.T) { want: []pbv1.MeasureResult{{ SID: 2, Timestamps: []int64{1, 2}, + Versions: []int64{2, 5}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, @@ -497,6 +1179,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 1, Timestamps: []int64{1, 2}, + Versions: []int64{1, 4}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}), strArrTagValue([]string{"value5", "value6"})}}, @@ -521,6 +1204,7 @@ func TestQueryResult(t *testing.T) { }, { SID: 3, Timestamps: []int64{1, 2}, + Versions: []int64{3, 6}, TagFamilies: []pbv1.TagFamily{ {Name: "arrTag", Tags: []pbv1.Tag{ {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index 4ea3b81d..e82acff0 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -49,6 +49,7 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) { dpsList: []*dataPoints{ { timestamps: []int64{}, + versions: []int64{}, seriesIDs: []common.SeriesID{}, tagFamilies: make([][]nameValues, 0), fields: make([]nameValues, 0), @@ -61,6 +62,7 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) { dpsList: []*dataPoints{ { timestamps: []int64{1}, + versions: []int64{1}, seriesIDs: []common.SeriesID{1}, tagFamilies: [][]nameValues{ { @@ -274,7 +276,7 @@ func Test_tstIter(t *testing.T) { }, { name: "Test with multiple parts with different ts, the block will be merged", - dpsList: []*dataPoints{dpsTS1, dpsTS2, dpsTS2}, + dpsList: []*dataPoints{dpsTS1, dpsTS2}, sids: []common.SeriesID{1, 2, 3}, minTimestamp: 1, maxTimestamp: 2, @@ -334,7 +336,7 @@ func Test_tstIter(t *testing.T) { time.Sleep(100 * time.Millisecond) continue } - if len(snp.parts) == 1 || len(snp.parts) < len(tt.dpsList) { + if len(snp.parts) == 1 { snp.decRef() break } @@ -412,6 +414,7 @@ var fieldProjections = map[int][]string{ var dpsTS1 = &dataPoints{ seriesIDs: []common.SeriesID{1, 2, 3}, timestamps: []int64{1, 1, 1}, + versions: []int64{1, 2, 3}, tagFamilies: [][]nameValues{ { { @@ -460,9 +463,62 @@ var dpsTS1 = &dataPoints{ }, } +var dpsTS11 = &dataPoints{ + seriesIDs: []common.SeriesID{1, 2, 3}, + timestamps: []int64{1, 1, 1}, + versions: []int64{0, 1, 2}, + tagFamilies: [][]nameValues{ + { + { + name: "arrTag", values: []*nameValue{ + {name: "strArrTag", valueType: pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value5"), []byte("value6")}}, + {name: "intArrTag", valueType: pbv1.ValueTypeInt64Arr, value: nil, valueArr: [][]byte{convert.Int64ToBytes(35), convert.Int64ToBytes(40)}}, + }, + }, + { + name: "binaryTag", values: []*nameValue{ + {name: "binaryTag", valueType: pbv1.ValueTypeBinaryData, value: longText, valueArr: nil}, + }, + }, + { + name: "singleTag", values: []*nameValue{ + {name: "strTag", valueType: pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil}, + {name: "intTag", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(30), valueArr: nil}, + }, + }, + }, + { + { + name: "singleTag", values: []*nameValue{ + {name: "strTag1", valueType: pbv1.ValueTypeStr, value: []byte("tag3"), valueArr: nil}, + {name: "strTag2", valueType: pbv1.ValueTypeStr, value: []byte("tag4"), valueArr: nil}, + }, + }, + }, + {}, // empty tagFamilies for seriesID 6 + }, + fields: []nameValues{ + { + name: "skipped", values: []*nameValue{ + {name: "strField", valueType: pbv1.ValueTypeStr, value: []byte("field3"), valueArr: nil}, + {name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(3330), valueArr: nil}, + {name: "floatField", valueType: pbv1.ValueTypeFloat64, value: convert.Float64ToBytes(3663699.029), valueArr: nil}, + {name: "binaryField", valueType: pbv1.ValueTypeBinaryData, value: longText, valueArr: nil}, + }, + }, + {}, // empty fields for seriesID 5 + { + name: "onlyFields", values: []*nameValue{ + {name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(4440), valueArr: nil}, + }, + }, + }, +} + var dpsTS2 = &dataPoints{ seriesIDs: []common.SeriesID{1, 2, 3}, timestamps: []int64{2, 2, 2}, + versions: []int64{4, 5, 6}, tagFamilies: [][]nameValues{ { { @@ -515,12 +571,15 @@ func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints hugeDps := &dataPoints{ seriesIDs: []common.SeriesID{}, timestamps: []int64{}, + versions: []int64{}, tagFamilies: [][]nameValues{}, fields: []nameValues{}, } + now := time.Now().UnixNano() for i := startTimestamp; i <= endTimestamp; i++ { hugeDps.seriesIDs = append(hugeDps.seriesIDs, 1) hugeDps.timestamps = append(hugeDps.timestamps, i) + hugeDps.versions = append(hugeDps.versions, now+i) hugeDps.tagFamilies = append(hugeDps.tagFamilies, []nameValues{ { name: "arrTag", values: []*nameValue{ @@ -551,6 +610,7 @@ func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints } hugeDps.seriesIDs = append(hugeDps.seriesIDs, []common.SeriesID{2, 3}...) hugeDps.timestamps = append(hugeDps.timestamps, []int64{timestamp, timestamp}...) + hugeDps.versions = append(hugeDps.versions, []int64{now + timestamp, now + timestamp}...) hugeDps.tagFamilies = append(hugeDps.tagFamilies, [][]nameValues{{ { name: "singleTag", values: []*nameValue{ diff --git a/banyand/measure/write.go b/banyand/measure/write.go index 32dd4a71..4c50c47f 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -93,6 +93,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me dpg.tables = append(dpg.tables, dpt) } dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts) + dpt.dataPoints.versions = append(dpt.dataPoints.versions, req.DataPoint.Version) stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata()) if !ok { return nil, fmt.Errorf("cannot find measure definition: %s", req.GetMetadata()) diff --git a/banyand/stream/block.go b/banyand/stream/block.go index 84b73b01..252b3947 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -66,7 +66,7 @@ func (b *block) mustInitFromElements(timestamps []int64, elementIDs []string, ta func assertTimestampsSorted(timestamps []int64) { for i := range timestamps { if i > 0 && timestamps[i-1] > timestamps[i] { - logger.Panicf("log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d", + logger.Panicf("elements must be sorted by timestamp; got the previous element with bigger timestamp %d than the current element with timestamp %d", timestamps[i-1], timestamps[i]) } } diff --git a/docs/api-reference.md b/docs/api-reference.md index cae326b6..5ac35290 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -2363,6 +2363,7 @@ DataPoint is stored in Measures | timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | timestamp is in the timeunit of milliseconds. | | tag_families | [banyandb.model.v1.TagFamily](#banyandb-model-v1-TagFamily) | repeated | tag_families contains tags selected in the projection | | fields | [DataPoint.Field](#banyandb-measure-v1-DataPoint-Field) | repeated | fields contains fields selected in the projection | +| version | [int64](#int64) | | version is the version of the data point | @@ -2635,6 +2636,7 @@ DataPointValue is the data point for writing. It only contains values. | timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | timestamp is in the timeunit of milliseconds. | | tag_families | [banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | repeated | the order of tag_families' items match the measure schema | | fields | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) | repeated | the order of fields match the measure schema | +| version | [int64](#int64) | | the version of the data point | diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go index 0646cdbb..d5cc0304 100644 --- a/pkg/encoding/encoding.go +++ b/pkg/encoding/encoding.go @@ -89,5 +89,40 @@ const ( EncodeTypeDeltaConst EncodeTypeDelta EncodeTypeDeltaOfDelta - EncodeTypeXOR + EncodeTypeConstWithVersion + EncodeTypeDeltaConstWithVersion + EncodeTypeDeltaWithVersion + EncodeTypeDeltaOfDeltaWithVersion ) + +// GetVersionType returns the version type of the given encoding type. +func GetVersionType(et EncodeType) EncodeType { + switch et { + case EncodeTypeConst: + return EncodeTypeConstWithVersion + case EncodeTypeDeltaConst: + return EncodeTypeDeltaConstWithVersion + case EncodeTypeDelta: + return EncodeTypeDeltaWithVersion + case EncodeTypeDeltaOfDelta: + return EncodeTypeDeltaOfDeltaWithVersion + default: + return EncodeTypeUnknown + } +} + +// GetCommonType returns the common type of the given encoding type. +func GetCommonType(et EncodeType) EncodeType { + switch et { + case EncodeTypeConstWithVersion: + return EncodeTypeConst + case EncodeTypeDeltaConstWithVersion: + return EncodeTypeDeltaConst + case EncodeTypeDeltaWithVersion: + return EncodeTypeDelta + case EncodeTypeDeltaOfDeltaWithVersion: + return EncodeTypeDeltaOfDelta + default: + return EncodeTypeUnknown + } +} diff --git a/pkg/encoding/int_list.go b/pkg/encoding/int_list.go index 7ea736d9..82b842c5 100644 --- a/pkg/encoding/int_list.go +++ b/pkg/encoding/int_list.go @@ -55,7 +55,7 @@ func Int64ListToBytes(dst []byte, a []int64) (result []byte, mt EncodeType, firs // BytesToInt64List decodes bytes into a list of int64. func BytesToInt64List(dst []int64, src []byte, mt EncodeType, firstValue int64, itemsCount int) ([]int64, error) { - dst = extendInt64ListCapacity(dst, itemsCount) + dst = ExtendInt64ListCapacity(dst, itemsCount) var err error switch mt { @@ -100,7 +100,8 @@ func BytesToInt64List(dst []int64, src []byte, mt EncodeType, firstValue int64, } } -func extendInt64ListCapacity(dst []int64, additionalItems int) []int64 { +// ExtendInt64ListCapacity extends the capacity of the int64 list. +func ExtendInt64ListCapacity(dst []int64, additionalItems int) []int64 { dstLen := len(dst) if n := dstLen + additionalItems - cap(dst); n > 0 { dst = append(dst[:cap(dst)], make([]int64, n)...) diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go index 4ff5e411..3aaf08a5 100644 --- a/pkg/pb/v1/metadata.go +++ b/pkg/pb/v1/metadata.go @@ -104,6 +104,7 @@ type Field struct { // MeasureResult is the result of a query. type MeasureResult struct { Timestamps []int64 + Versions []int64 TagFamilies []TagFamily Fields []Field SID common.SeriesID diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index a037813e..4079f79a 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -223,6 +223,7 @@ func (ei *resultMIterator) Next() bool { for i := range r.Timestamps { dp := &measurev1.DataPoint{ Timestamp: timestamppb.New(time.Unix(0, r.Timestamps[i])), + Version: r.Versions[i], } for _, tf := range r.TagFamilies { diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index 8405152f..347d6b99 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -75,9 +75,15 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args innerGm.Expect(err).NotTo(gm.HaveOccurred()) want := &measurev1.QueryResponse{} helpers.UnmarshalYAML(ww, want) + for i := range resp.DataPoints { + if resp.DataPoints[i].Timestamp != nil { + innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0)) + } + } innerGm.Expect(cmp.Equal(resp, want, protocmp.IgnoreUnknown(), protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"), + protocmp.IgnoreFields(&measurev1.DataPoint{}, "version"), protocmp.Transform())). To(gm.BeTrue(), func() string { j, err := protojson.Marshal(resp) diff --git a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json index dee1ddae..3f7a5f96 100644 --- a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json +++ b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json @@ -5,12 +5,12 @@ "tags": [ { "str": { - "value": "7" + "value": "12" } }, { "str": { - "value": "entity_2" + "value": "entity_1" } }, { @@ -27,12 +27,12 @@ "fields": [ { "int": { - "value": 102 + "value": 300 } }, { "int": { - "value": 10 + "value": 7 } } ] @@ -43,7 +43,7 @@ "tags": [ { "str": { - "value": "8" + "value": "7" } }, { @@ -53,13 +53,11 @@ }, { "str": { - "value": "svc_1" + "value": "" } }, { - "str": { - "value": "GET:/metrics" - } + "null": 0 } ] } @@ -67,12 +65,12 @@ "fields": [ { "int": { - "value": 103 + "value": 102 } }, { "int": { - "value": 9 + "value": 10 } } ] @@ -83,7 +81,7 @@ "tags": [ { "str": { - "value": "9" + "value": "8" } }, { @@ -93,12 +91,12 @@ }, { "str": { - "value": "svc_4" + "value": "svc_1" } }, { "str": { - "value": "GET:/check" + "value": "GET:/metrics" } } ] @@ -107,12 +105,12 @@ "fields": [ { "int": { - "value": 130 + "value": 103 } }, { "int": { - "value": 8 + "value": 9 } } ] @@ -123,17 +121,22 @@ "tags": [ { "str": { - "value": "10" + "value": "9" } }, { "str": { - "value": "entity_3" + "value": "entity_2" } }, { "str": { - "value": "svc_1" + "value": "svc_4" + } + }, + { + "str": { + "value": "GET:/check" } } ] @@ -142,12 +145,12 @@ "fields": [ { "int": { - "value": 133 + "value": 130 } }, { "int": { - "value": 11 + "value": 8 } } ] @@ -158,17 +161,17 @@ "tags": [ { "str": { - "value": "11" + "value": "10" } }, { "str": { - "value": "entity_1" + "value": "entity_3" } }, { "str": { - "value": "svc_2" + "value": "svc_1" } } ] @@ -177,12 +180,12 @@ "fields": [ { "int": { - "value": 54 + "value": 133 } }, { "int": { - "value": 12 + "value": 11 } } ] @@ -193,7 +196,7 @@ "tags": [ { "str": { - "value": "12" + "value": "11" } }, { @@ -203,11 +206,8 @@ }, { "str": { - "value": "" + "value": "svc_2" } - }, - { - "null": 0 } ] } @@ -215,12 +215,12 @@ "fields": [ { "int": { - "value": 300 + "value": 54 } }, { "int": { - "value": 7 + "value": 12 } } ]
