This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4efc6a9  Fix inaccurate timestamp in the StartTimeIndex store (#28)
4efc6a9 is described below

commit 4efc6a937cd718f6f4d36836712de21ee9955a70
Author: Jiajing LU <[email protected]>
AuthorDate: Thu Aug 5 13:01:40 2021 +0800

    Fix inaccurate timestamp in the StartTimeIndex store (#28)
    
    * first reproduce the issue
    
    Signed-off-by: Megrez Lu <[email protected]>
    
    * fix issue
    
    Signed-off-by: Megrez Lu <[email protected]>
---
 banyand/query/processor_test.go | 13 +++++++++++++
 banyand/series/trace/write.go   | 25 +++++++++++++------------
 2 files changed, 26 insertions(+), 12 deletions(-)

diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index d686b7b..54d2f65 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -308,6 +308,19 @@ func TestQueryProcessor(t *testing.T) {
                        wantLen: 0,
                },
                {
+                       name: "query given timeRange which slightly covers the 
first three segments",
+                       queryGenerator: func(baseTs time.Time) *v1.QueryRequest 
{
+                               return pb.NewQueryRequestBuilder().
+                                       Limit(10).
+                                       Offset(0).
+                                       Metadata("default", "sw").
+                                       
TimeRange(baseTs.Add(-1*time.Nanosecond), 
baseTs.Add(2*interval).Add(1*time.Nanosecond)).
+                                       Projection("trace_id").
+                                       Build()
+                       },
+                       wantLen: 3,
+               },
+               {
                        name: "query TraceID given timeRange includes the time 
range of data",
                        queryGenerator: func(baseTs time.Time) *v1.QueryRequest 
{
                                return pb.NewQueryRequestBuilder().
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 60ed91a..115ba15 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -46,18 +46,18 @@ func (t *traceSeries) Write(seriesID common.SeriesID, 
shardID uint, entity data.
                return 0, errGetState
        }
        stateBytes := []byte{byte(state)}
-       tts := uint64(entity.GetTimestamp().AsTime().UnixNano())
-       chunkID := t.idGen.Next(tts)
-       ts, errParseTS := t.idGen.ParseTS(chunkID)
+       entityTs := uint64(entity.GetTimestamp().AsTime().UnixNano())
+       chunkID := t.idGen.Next(entityTs)
+       wallTs, errParseTS := t.idGen.ParseTS(chunkID)
        if errParseTS != nil {
                return 0, errors.Wrap(errParseTS, "failed to parse timestamp 
from chunk id")
        }
-       tsBytes := convert.Uint64ToBytes(ts)
+       wallTsBytes := convert.Uint64ToBytes(wallTs)
        intSeriesID := uint64(seriesID)
        seriesIDBytes := convert.Uint64ToBytes(intSeriesID)
-       wp := t.writePoint(ts)
+       wp := t.writePoint(wallTs)
 
-       err = wp.TimeSeriesWriter(shardID, dataStoreName).Put(seriesIDBytes, 
entityVal.GetDataBinary(), ts)
+       err = wp.TimeSeriesWriter(shardID, dataStoreName).Put(seriesIDBytes, 
entityVal.GetDataBinary(), wallTs)
        if err != nil {
                return 0, errors.Wrap(err, "fail to write traceSeries data")
        }
@@ -66,28 +66,29 @@ func (t *traceSeries) Write(seriesID common.SeriesID, 
shardID uint, entity data.
        if err != nil {
                return 0, errors.Wrap(err, "fail to serialize EntityValue to 
[]byte")
        }
-       err = wp.TimeSeriesWriter(shardID, fieldsStoreName).Put(seriesIDBytes, 
byteVal, ts)
+       err = wp.TimeSeriesWriter(shardID, fieldsStoreName).Put(seriesIDBytes, 
byteVal, wallTs)
        if err != nil {
                return 0, errors.Wrap(err, "failed to write traceSeries fields")
        }
 
        chunkIDBytes := convert.Uint64ToBytes(chunkID)
-       if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes, 
bydb_bytes.Join(stateBytes, seriesIDBytes, tsBytes)); err != nil {
+       if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes, 
bydb_bytes.Join(stateBytes, seriesIDBytes, wallTsBytes)); err != nil {
                return 0, errors.Wrap(err, "failed to write chunkID index")
        }
        traceIDShardID := partition.ShardID(traceID, t.shardNum)
        if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).
-               Put(traceID, 
bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), tts); 
err != nil {
+               Put(traceID, 
bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), 
entityTs); err != nil {
                return 0, errors.Wrap(err, "failed to Trace index")
        }
-       err = wp.Writer(shardID, 
startTimeIndex).Put(bydb_bytes.Join(stateBytes, tsBytes, chunkIDBytes), nil)
+       entityTsBytes := convert.Uint64ToBytes(entityTs)
+       err = wp.Writer(shardID, 
startTimeIndex).Put(bydb_bytes.Join(stateBytes, entityTsBytes, chunkIDBytes), 
nil)
        if err != nil {
                return 0, errors.Wrap(err, "failed to write start time index")
        }
        t.l.Debug().Uint64("chunk_id", chunkID).
                Uint64("series_id", intSeriesID).
-               Time("ts", time.Unix(0, int64(ts))).
-               Uint64("ts_int", ts).
+               Time("wallTs", time.Unix(0, int64(wallTs))).
+               Uint64("wallTs_int", wallTs).
                Int("data_size", len(entityVal.GetDataBinary())).
                Int("fields_num", len(entityVal.GetFields())).
                Hex("trace_id", traceID).

Reply via email to