This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4efc6a9 Fix inaccurate timestamp in the StartTimeIndex store (#28)
4efc6a9 is described below
commit 4efc6a937cd718f6f4d36836712de21ee9955a70
Author: Jiajing LU <[email protected]>
AuthorDate: Thu Aug 5 13:01:40 2021 +0800
Fix inaccurate timestamp in the StartTimeIndex store (#28)
* first reproduce the issue
Signed-off-by: Megrez Lu <[email protected]>
* fix issue
Signed-off-by: Megrez Lu <[email protected]>
---
banyand/query/processor_test.go | 13 +++++++++++++
banyand/series/trace/write.go | 25 +++++++++++++------------
2 files changed, 26 insertions(+), 12 deletions(-)
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index d686b7b..54d2f65 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -308,6 +308,19 @@ func TestQueryProcessor(t *testing.T) {
wantLen: 0,
},
{
+ name: "query given timeRange which slightly covers the
first three segments",
+ queryGenerator: func(baseTs time.Time) *v1.QueryRequest
{
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+
TimeRange(baseTs.Add(-1*time.Nanosecond),
baseTs.Add(2*interval).Add(1*time.Nanosecond)).
+ Projection("trace_id").
+ Build()
+ },
+ wantLen: 3,
+ },
+ {
name: "query TraceID given timeRange includes the time
range of data",
queryGenerator: func(baseTs time.Time) *v1.QueryRequest
{
return pb.NewQueryRequestBuilder().
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 60ed91a..115ba15 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -46,18 +46,18 @@ func (t *traceSeries) Write(seriesID common.SeriesID,
shardID uint, entity data.
return 0, errGetState
}
stateBytes := []byte{byte(state)}
- tts := uint64(entity.GetTimestamp().AsTime().UnixNano())
- chunkID := t.idGen.Next(tts)
- ts, errParseTS := t.idGen.ParseTS(chunkID)
+ entityTs := uint64(entity.GetTimestamp().AsTime().UnixNano())
+ chunkID := t.idGen.Next(entityTs)
+ wallTs, errParseTS := t.idGen.ParseTS(chunkID)
if errParseTS != nil {
return 0, errors.Wrap(errParseTS, "failed to parse timestamp
from chunk id")
}
- tsBytes := convert.Uint64ToBytes(ts)
+ wallTsBytes := convert.Uint64ToBytes(wallTs)
intSeriesID := uint64(seriesID)
seriesIDBytes := convert.Uint64ToBytes(intSeriesID)
- wp := t.writePoint(ts)
+ wp := t.writePoint(wallTs)
- err = wp.TimeSeriesWriter(shardID, dataStoreName).Put(seriesIDBytes,
entityVal.GetDataBinary(), ts)
+ err = wp.TimeSeriesWriter(shardID, dataStoreName).Put(seriesIDBytes,
entityVal.GetDataBinary(), wallTs)
if err != nil {
return 0, errors.Wrap(err, "fail to write traceSeries data")
}
@@ -66,28 +66,29 @@ func (t *traceSeries) Write(seriesID common.SeriesID,
shardID uint, entity data.
if err != nil {
return 0, errors.Wrap(err, "fail to serialize EntityValue to
[]byte")
}
- err = wp.TimeSeriesWriter(shardID, fieldsStoreName).Put(seriesIDBytes,
byteVal, ts)
+ err = wp.TimeSeriesWriter(shardID, fieldsStoreName).Put(seriesIDBytes,
byteVal, wallTs)
if err != nil {
return 0, errors.Wrap(err, "failed to write traceSeries fields")
}
chunkIDBytes := convert.Uint64ToBytes(chunkID)
- if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes,
bydb_bytes.Join(stateBytes, seriesIDBytes, tsBytes)); err != nil {
+ if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes,
bydb_bytes.Join(stateBytes, seriesIDBytes, wallTsBytes)); err != nil {
return 0, errors.Wrap(err, "failed to write chunkID index")
}
traceIDShardID := partition.ShardID(traceID, t.shardNum)
if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).
- Put(traceID,
bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), tts);
err != nil {
+ Put(traceID,
bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes),
entityTs); err != nil {
return 0, errors.Wrap(err, "failed to Trace index")
}
- err = wp.Writer(shardID,
startTimeIndex).Put(bydb_bytes.Join(stateBytes, tsBytes, chunkIDBytes), nil)
+ entityTsBytes := convert.Uint64ToBytes(entityTs)
+ err = wp.Writer(shardID,
startTimeIndex).Put(bydb_bytes.Join(stateBytes, entityTsBytes, chunkIDBytes),
nil)
if err != nil {
return 0, errors.Wrap(err, "failed to write start time index")
}
t.l.Debug().Uint64("chunk_id", chunkID).
Uint64("series_id", intSeriesID).
- Time("ts", time.Unix(0, int64(ts))).
- Uint64("ts_int", ts).
+ Time("wallTs", time.Unix(0, int64(wallTs))).
+ Uint64("wallTs_int", wallTs).
Int("data_size", len(entityVal.GetDataBinary())).
Int("fields_num", len(entityVal.GetFields())).
Hex("trace_id", traceID).