This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4d8b99a4 feat: remove entity info from ".tf"&".tfm" file (#384)
4d8b99a4 is described below
commit 4d8b99a47a932daf0867bf8d069eb88c8fe3c232
Author: A3bz <[email protected]>
AuthorDate: Fri Feb 2 07:07:06 2024 +0800
feat: remove entity info from ".tf"&".tfm" file (#384)
* feat: remove entity from ".tf"&".tfm" file, and find entity in series
index
* feat: update topN tag family and entity
Signed-off-by: A3bz <[email protected]>
---------
Signed-off-by: A3bz <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/measure/metadata.go | 15 ++-
banyand/measure/query.go | 130 +++++++++++++++++++++--
banyand/measure/topn.go | 16 +++
banyand/measure/write.go | 15 +++
banyand/query/processor_topn.go | 6 +-
pkg/query/logical/measure/topn_analyzer.go | 27 ++++-
pkg/query/logical/measure/topn_plan_localscan.go | 16 +--
7 files changed, 193 insertions(+), 32 deletions(-)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index a3cbfe48..681e36c0 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -208,11 +208,24 @@ func createOrUpdateTopNMeasure(ctx context.Context,
measureSchemaRegistry schema
Name: "measure_id",
Type:
databasev1.TagType_TAG_TYPE_STRING,
},
+ {
+ Name: "sortDirection",
+ Type:
databasev1.TagType_TAG_TYPE_INT,
+ },
+ {
+ Name: "rankNumber",
+ Type:
databasev1.TagType_TAG_TYPE_INT,
+ },
}, seriesSpecs...),
},
},
Fields: []*databasev1.FieldSpec{topNValueFieldSpec},
- Entity: sourceMeasureSchema.GetEntity(),
+ Entity: &databasev1.Entity{
+ TagNames: append([]string{
+ "sortDirection",
+ "rankNumber",
+ }, topNSchema.GetGroupByTagNames()...),
+ },
}
if oldTopNSchema == nil {
if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx,
newTopNMeasure); innerErr != nil {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 9664eef1..1569cbc8 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -18,6 +18,7 @@
package measure
import (
+ "bytes"
"container/heap"
"context"
"fmt"
@@ -33,6 +34,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -78,11 +80,21 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
tabWrappers[i].DecRef()
}
}()
+
+ tagNameIndex := make(map[string]partition.TagLocator)
+ tagFamilySpecs := s.schema.GetTagFamilies()
+ for i, tagFamilySpec := range tagFamilySpecs {
+ for j, tagSpec := range tagFamilySpec.GetTags() {
+ tagNameIndex[tagSpec.GetName()] = partition.TagLocator{
+ FamilyOffset: i,
+ TagOffset: j,
+ }
+ }
+ }
sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name,
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order)
if err != nil {
return nil, err
}
-
var result queryResult
if len(sl) < 1 {
return &result, nil
@@ -128,6 +140,15 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
if tstIter.Error() != nil {
return nil, fmt.Errorf("cannot iterate tstIter: %w",
tstIter.Error())
}
+
+ result.seriesList = sl
+ result.tagNameIndex = tagNameIndex
+ result.schema = s.schema
+
+ result.sidToIndex = make(map[common.SeriesID]int)
+ for i, si := range originalSids {
+ result.sidToIndex[si] = i
+ }
if mqo.Order == nil {
result.orderByTS = true
result.ascTS = true
@@ -141,13 +162,59 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
return &result, nil
}
- result.sidToIndex = make(map[common.SeriesID]int)
- for i, si := range originalSids {
- result.sidToIndex[si] = i
- }
return &result, nil
}
+func mustEncodeTagValue(tagType databasev1.TagType, tagValue
*modelv1.TagValue, num int) [][]byte {
+ values := make([][]byte, 0)
+ switch tagType {
+ case databasev1.TagType_TAG_TYPE_INT:
+ if tagValue.GetInt() != nil {
+ for i := 0; i < num; i++ {
+ values = append(values,
convert.Int64ToBytes(tagValue.GetInt().GetValue()))
+ }
+ }
+ case databasev1.TagType_TAG_TYPE_STRING:
+ if tagValue.GetStr() != nil {
+ for i := 0; i < num; i++ {
+ values = append(values,
[]byte(tagValue.GetStr().GetValue()))
+ }
+ }
+ case databasev1.TagType_TAG_TYPE_DATA_BINARY:
+ if tagValue.GetBinaryData() != nil {
+ for i := 0; i < num; i++ {
+ values = append(values,
bytes.Clone(tagValue.GetBinaryData()))
+ }
+ }
+ case databasev1.TagType_TAG_TYPE_INT_ARRAY:
+ if tagValue.GetIntArray() == nil {
+ return values
+ }
+ for i := 0; i < num; i++ {
+ value := make([]byte, 0)
+ for j := range tagValue.GetIntArray().Value {
+ value = append(value, byte(128))
+ value = append(value,
convert.Int64ToBytes(tagValue.GetIntArray().Value[j])...)
+ }
+ values = append(values, value)
+ }
+ case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
+ if tagValue.GetStrArray() == nil {
+ return values
+ }
+ for i := 0; i < num; i++ {
+ var value string
+ for j := range tagValue.GetStrArray().Value {
+ value += tagValue.GetStrArray().Value[j] + "|"
+ }
+ values = append(values, []byte(value))
+ }
+ default:
+ logger.Panicf("unsupported tag value type: %T",
tagValue.GetValue())
+ }
+ return values
+}
+
func mustDecodeTagValue(valueType pbv1.ValueType, value []byte)
*modelv1.TagValue {
if value == nil {
switch valueType {
@@ -315,12 +382,15 @@ func binaryDataFieldValue(value []byte)
*modelv1.FieldValue {
}
type queryResult struct {
- sidToIndex map[common.SeriesID]int
- data []*blockCursor
- snapshots []*snapshot
- loaded bool
- orderByTS bool
- ascTS bool
+ sidToIndex map[common.SeriesID]int
+ tagNameIndex map[string]partition.TagLocator
+ schema *databasev1.Measure
+ data []*blockCursor
+ snapshots []*snapshot
+ seriesList pbv1.SeriesList
+ loaded bool
+ orderByTS bool
+ ascTS bool
}
func (qr *queryResult) Pull() *pbv1.MeasureResult {
@@ -336,6 +406,44 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult {
qr.data = append(qr.data[:i], qr.data[i+1:]...)
i--
}
+ if qr.schema.GetEntity() == nil ||
len(qr.schema.GetEntity().GetTagNames()) == 0 {
+ continue
+ }
+ sidIndex := qr.sidToIndex[qr.data[i].bm.seriesID]
+ series := qr.seriesList[sidIndex]
+ entityMap := make(map[string]int)
+ tagFamilyMap := make(map[string]int)
+ for idx, entity := range
qr.schema.GetEntity().GetTagNames() {
+ entityMap[entity] = idx + 1
+ }
+ for idx, tagFamily := range qr.data[i].tagFamilies {
+ tagFamilyMap[tagFamily.name] = idx + 1
+ }
+ for _, tagFamilyProj := range qr.data[i].tagProjection {
+ for j, tagProj := range tagFamilyProj.Names {
+ entityPos := entityMap[tagProj]
+ tagFamilyPos :=
tagFamilyMap[tagFamilyProj.Family]
+ if entityPos == 0 {
+ continue
+ }
+ if tagFamilyPos == 0 {
+
qr.data[i].tagFamilies[tagFamilyPos-1] = columnFamily{
+ name:
tagFamilyProj.Family,
+ columns: make([]column,
0),
+ }
+ }
+ offset := qr.tagNameIndex[tagProj]
+ tagFamilySpec :=
qr.schema.GetTagFamilies()[offset.FamilyOffset]
+ tagSpec :=
tagFamilySpec.GetTags()[offset.TagOffset]
+ valueType :=
pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1])
+
qr.data[i].tagFamilies[tagFamilyPos-1].columns =
append(qr.data[i].tagFamilies[tagFamilyPos-1].columns[:j],
+ append([]column{{
+ name: tagProj,
+ values:
mustEncodeTagValue(tagSpec.GetType(), series.EntityValues[entityPos-1],
len(qr.data[i].timestamps)),
+ valueType: valueType,
+ }},
qr.data[i].tagFamilies[tagFamilyPos-1].columns[j:]...)...)
+ }
+ }
if qr.orderByTimestampDesc() {
qr.data[i].idx = len(qr.data[i].timestamps) - 1
}
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 80d7145f..10774efc 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -201,6 +201,22 @@ func (t *topNStreamingProcessor) writeData(publisher
queue.BatchPublisher, event
},
},
},
+ // SortDirection
+ {
+ Value:
&modelv1.TagValue_Int{
+ Int:
&modelv1.Int{
+
Value: int64(t.sortDirection),
+ },
+ },
+ },
+ // RankNumber
+ {
+ Value:
&modelv1.TagValue_Int{
+ Int:
&modelv1.Int{
+
Value: int64(rankNum),
+ },
+ },
+ },
},
data[0].([]*modelv1.TagValue)...),
},
},
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index 4ba62d6b..e7f16f19 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -127,6 +127,11 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
tagFamilies := make([]nameValues, len(stm.schema.TagFamilies))
dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies,
tagFamilies)
+ entityMap := make(map[string]bool)
+
+ for _, entity := range stm.GetSchema().GetEntity().GetTagNames() {
+ entityMap[entity] = true
+ }
for i := range stm.GetSchema().GetTagFamilies() {
var tagFamily *modelv1.TagFamilyForWrite
if len(req.DataPoint.TagFamilies) <= i {
@@ -189,6 +194,16 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
}
}
+ for i := range tagFamilies {
+ values := make([]*nameValue, len(tagFamilies[i].values))
+ copy(values, tagFamilies[i].values)
+ tagFamilies[i].values = tagFamilies[i].values[:0]
+ for j, tagValue := range values {
+ if !entityMap[tagValue.name] {
+ tagFamilies[i].values =
append(tagFamilies[i].values, values[j])
+ }
+ }
+ }
dpg.docs = append(dpg.docs, index.Document{
DocID: uint64(series.ID),
EntityValues: series.Buffer,
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 84f6ee6c..dc40753e 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -90,14 +90,15 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
return
}
+ sourceMeasureSchema := sourceMeasure.GetSchema()
sourceMeasure.SetSchema(schema)
- s, err := logical_measure.BuildTopNSchema(schema,
topNSchema.GetGroupByTagNames())
+ s, err := logical_measure.BuildTopNSchema(schema)
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to build schema")
}
- plan, err := logical_measure.TopNAnalyze(context.TODO(), request,
schema, topNSchema, s)
+ plan, err := logical_measure.TopNAnalyze(context.TODO(), request,
schema, sourceMeasureSchema, topNSchema, s)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for topn %s: %v", topNMetadata.GetName(), err))
return
@@ -118,6 +119,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
}
}()
+ sourceMeasure.SetSchema(sourceMeasureSchema)
result := make([]*measurev1.DataPoint, 0)
for mIterator.Next() {
current := mIterator.Current()
diff --git a/pkg/query/logical/measure/topn_analyzer.go
b/pkg/query/logical/measure/topn_analyzer.go
index 2def6f73..4a6e05e6 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -24,17 +24,18 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
// BuildTopNSchema returns Schema loaded from the metadata repository.
-func BuildTopNSchema(md *databasev1.Measure, entityList []string)
(logical.Schema, error) {
+func BuildTopNSchema(md *databasev1.Measure) (logical.Schema, error) {
md.GetEntity()
ms := &schema{
common: &logical.CommonSchema{
TagSpecMap: make(map[string]*logical.TagSpec),
- EntityList: entityList,
+ EntityList: md.GetEntity().GetTagNames(),
},
measure: md,
fieldMap: make(map[string]*logical.FieldSpec),
@@ -51,9 +52,9 @@ func BuildTopNSchema(md *databasev1.Measure, entityList
[]string) (logical.Schem
// TopNAnalyze converts logical expressions to executable operation tree
represented by Plan.
func TopNAnalyze(_ context.Context, criteria *measurev1.TopNRequest, schema
*databasev1.Measure,
- topNAggregation *databasev1.TopNAggregation, s logical.Schema,
+ sourceMeasureSchema *databasev1.Measure, topNAggregation
*databasev1.TopNAggregation, s logical.Schema,
) (logical.Plan, error) {
- groupByProjectionTags := schema.GetEntity().GetTagNames()
+ groupByProjectionTags := sourceMeasureSchema.GetEntity().GetTagNames()
groupByTags := make([][]*logical.Tag, len(schema.GetTagFamilies()))
tagFamily := schema.GetTagFamilies()[0]
groupByTags[0] = logical.NewTags(tagFamily.GetName(),
groupByProjectionTags...)
@@ -90,5 +91,21 @@ func parse(criteria *measurev1.TopNRequest, metadata
*commonv1.Metadata,
) logical.UnresolvedPlan {
timeRange := criteria.GetTimeRange()
return local(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(),
- metadata, projTags, projFields, criteria.GetConditions(),
criteria.GetFieldValueSort())
+ metadata, projTags, projFields, buildConditions(criteria),
criteria.GetFieldValueSort())
+}
+
+func buildConditions(criteria *measurev1.TopNRequest) []*modelv1.Condition {
+ return append([]*modelv1.Condition{
+ {
+ Name: "sortDirection",
+ Op: modelv1.Condition_BINARY_OP_EQ,
+ Value: &modelv1.TagValue{
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value:
int64(criteria.GetFieldValueSort().Number()),
+ },
+ },
+ },
+ },
+ }, criteria.GetConditions()...)
}
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go
b/pkg/query/logical/measure/topn_plan_localscan.go
index f1c48d1d..46a4b56d 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -96,21 +96,11 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema)
(logical.Plan, error)
func (uls *unresolvedLocalScan) locateEntity(entityList []string)
([]*modelv1.TagValue, error) {
entityMap := make(map[string]int)
- entity := make([]*modelv1.TagValue, 1+1+len(entityList))
- // sortDirection
- entity[0] = &modelv1.TagValue{
- Value: &modelv1.TagValue_Int{
- Int: &modelv1.Int{
- Value: int64(uls.sort.Number()),
- },
- },
- }
- // rankNumber
- entity[1] = pbv1.AnyTagValue
+ entity := make([]*modelv1.TagValue, len(entityList))
for idx, tagName := range entityList {
- entityMap[tagName] = idx + 2
+ entityMap[tagName] = idx
// allow to make fuzzy search with partial conditions
- entity[idx+2] = pbv1.AnyTagValue
+ entity[idx] = pbv1.AnyTagValue
}
for _, pairQuery := range uls.conditions {
if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {