This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d8b99a4 feat: remove entity info from ".tf"&".tfm" file (#384)
4d8b99a4 is described below

commit 4d8b99a47a932daf0867bf8d069eb88c8fe3c232
Author: A3bz <[email protected]>
AuthorDate: Fri Feb 2 07:07:06 2024 +0800

    feat: remove entity info from ".tf"&".tfm" file (#384)
    
    * feat: remove entity from ".tf"&".tfm" file, and find entity in series 
index
    
    * feat: update topN tag family and entity
    
    Signed-off-by: A3bz <[email protected]>
    
    ---------
    
    Signed-off-by: A3bz <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/measure/metadata.go                      |  15 ++-
 banyand/measure/query.go                         | 130 +++++++++++++++++++++--
 banyand/measure/topn.go                          |  16 +++
 banyand/measure/write.go                         |  15 +++
 banyand/query/processor_topn.go                  |   6 +-
 pkg/query/logical/measure/topn_analyzer.go       |  27 ++++-
 pkg/query/logical/measure/topn_plan_localscan.go |  16 +--
 7 files changed, 193 insertions(+), 32 deletions(-)

diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index a3cbfe48..681e36c0 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -208,11 +208,24 @@ func createOrUpdateTopNMeasure(ctx context.Context, 
measureSchemaRegistry schema
                                                Name: "measure_id",
                                                Type: 
databasev1.TagType_TAG_TYPE_STRING,
                                        },
+                                       {
+                                               Name: "sortDirection",
+                                               Type: 
databasev1.TagType_TAG_TYPE_INT,
+                                       },
+                                       {
+                                               Name: "rankNumber",
+                                               Type: 
databasev1.TagType_TAG_TYPE_INT,
+                                       },
                                }, seriesSpecs...),
                        },
                },
                Fields: []*databasev1.FieldSpec{topNValueFieldSpec},
-               Entity: sourceMeasureSchema.GetEntity(),
+               Entity: &databasev1.Entity{
+                       TagNames: append([]string{
+                               "sortDirection",
+                               "rankNumber",
+                       }, topNSchema.GetGroupByTagNames()...),
+               },
        }
        if oldTopNSchema == nil {
                if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, 
newTopNMeasure); innerErr != nil {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 9664eef1..1569cbc8 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "bytes"
        "container/heap"
        "context"
        "fmt"
@@ -33,6 +34,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
@@ -78,11 +80,21 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
                        tabWrappers[i].DecRef()
                }
        }()
+
+       tagNameIndex := make(map[string]partition.TagLocator)
+       tagFamilySpecs := s.schema.GetTagFamilies()
+       for i, tagFamilySpec := range tagFamilySpecs {
+               for j, tagSpec := range tagFamilySpec.GetTags() {
+                       tagNameIndex[tagSpec.GetName()] = partition.TagLocator{
+                               FamilyOffset: i,
+                               TagOffset:    j,
+                       }
+               }
+       }
        sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, 
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order)
        if err != nil {
                return nil, err
        }
-
        var result queryResult
        if len(sl) < 1 {
                return &result, nil
@@ -128,6 +140,15 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
        if tstIter.Error() != nil {
                return nil, fmt.Errorf("cannot iterate tstIter: %w", 
tstIter.Error())
        }
+
+       result.seriesList = sl
+       result.tagNameIndex = tagNameIndex
+       result.schema = s.schema
+
+       result.sidToIndex = make(map[common.SeriesID]int)
+       for i, si := range originalSids {
+               result.sidToIndex[si] = i
+       }
        if mqo.Order == nil {
                result.orderByTS = true
                result.ascTS = true
@@ -141,13 +162,59 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
                return &result, nil
        }
 
-       result.sidToIndex = make(map[common.SeriesID]int)
-       for i, si := range originalSids {
-               result.sidToIndex[si] = i
-       }
        return &result, nil
 }
 
+func mustEncodeTagValue(tagType databasev1.TagType, tagValue 
*modelv1.TagValue, num int) [][]byte {
+       values := make([][]byte, 0)
+       switch tagType {
+       case databasev1.TagType_TAG_TYPE_INT:
+               if tagValue.GetInt() != nil {
+                       for i := 0; i < num; i++ {
+                               values = append(values, 
convert.Int64ToBytes(tagValue.GetInt().GetValue()))
+                       }
+               }
+       case databasev1.TagType_TAG_TYPE_STRING:
+               if tagValue.GetStr() != nil {
+                       for i := 0; i < num; i++ {
+                               values = append(values, 
[]byte(tagValue.GetStr().GetValue()))
+                       }
+               }
+       case databasev1.TagType_TAG_TYPE_DATA_BINARY:
+               if tagValue.GetBinaryData() != nil {
+                       for i := 0; i < num; i++ {
+                               values = append(values, 
bytes.Clone(tagValue.GetBinaryData()))
+                       }
+               }
+       case databasev1.TagType_TAG_TYPE_INT_ARRAY:
+               if tagValue.GetIntArray() == nil {
+                       return values
+               }
+               for i := 0; i < num; i++ {
+                       value := make([]byte, 0)
+                       for j := range tagValue.GetIntArray().Value {
+                               value = append(value, byte(128))
+                               value = append(value, 
convert.Int64ToBytes(tagValue.GetIntArray().Value[j])...)
+                       }
+                       values = append(values, value)
+               }
+       case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
+               if tagValue.GetStrArray() == nil {
+                       return values
+               }
+               for i := 0; i < num; i++ {
+                       var value string
+                       for j := range tagValue.GetStrArray().Value {
+                               value += tagValue.GetStrArray().Value[j] + "|"
+                       }
+                       values = append(values, []byte(value))
+               }
+       default:
+               logger.Panicf("unsupported tag value type: %T", 
tagValue.GetValue())
+       }
+       return values
+}
+
 func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) 
*modelv1.TagValue {
        if value == nil {
                switch valueType {
@@ -315,12 +382,15 @@ func binaryDataFieldValue(value []byte) 
*modelv1.FieldValue {
 }
 
 type queryResult struct {
-       sidToIndex map[common.SeriesID]int
-       data       []*blockCursor
-       snapshots  []*snapshot
-       loaded     bool
-       orderByTS  bool
-       ascTS      bool
+       sidToIndex   map[common.SeriesID]int
+       tagNameIndex map[string]partition.TagLocator
+       schema       *databasev1.Measure
+       data         []*blockCursor
+       snapshots    []*snapshot
+       seriesList   pbv1.SeriesList
+       loaded       bool
+       orderByTS    bool
+       ascTS        bool
 }
 
 func (qr *queryResult) Pull() *pbv1.MeasureResult {
@@ -336,6 +406,44 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult {
                                qr.data = append(qr.data[:i], qr.data[i+1:]...)
                                i--
                        }
+                       if qr.schema.GetEntity() == nil || 
len(qr.schema.GetEntity().GetTagNames()) == 0 {
+                               continue
+                       }
+                       sidIndex := qr.sidToIndex[qr.data[i].bm.seriesID]
+                       series := qr.seriesList[sidIndex]
+                       entityMap := make(map[string]int)
+                       tagFamilyMap := make(map[string]int)
+                       for idx, entity := range 
qr.schema.GetEntity().GetTagNames() {
+                               entityMap[entity] = idx + 1
+                       }
+                       for idx, tagFamily := range qr.data[i].tagFamilies {
+                               tagFamilyMap[tagFamily.name] = idx + 1
+                       }
+                       for _, tagFamilyProj := range qr.data[i].tagProjection {
+                               for j, tagProj := range tagFamilyProj.Names {
+                                       entityPos := entityMap[tagProj]
+                                       tagFamilyPos := 
tagFamilyMap[tagFamilyProj.Family]
+                                       if entityPos == 0 {
+                                               continue
+                                       }
+                                       if tagFamilyPos == 0 {
+                                               
qr.data[i].tagFamilies[tagFamilyPos-1] = columnFamily{
+                                                       name:    
tagFamilyProj.Family,
+                                                       columns: make([]column, 
0),
+                                               }
+                                       }
+                                       offset := qr.tagNameIndex[tagProj]
+                                       tagFamilySpec := 
qr.schema.GetTagFamilies()[offset.FamilyOffset]
+                                       tagSpec := 
tagFamilySpec.GetTags()[offset.TagOffset]
+                                       valueType := 
pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1])
+                                       
qr.data[i].tagFamilies[tagFamilyPos-1].columns = 
append(qr.data[i].tagFamilies[tagFamilyPos-1].columns[:j],
+                                               append([]column{{
+                                                       name:      tagProj,
+                                                       values:    
mustEncodeTagValue(tagSpec.GetType(), series.EntityValues[entityPos-1], 
len(qr.data[i].timestamps)),
+                                                       valueType: valueType,
+                                               }}, 
qr.data[i].tagFamilies[tagFamilyPos-1].columns[j:]...)...)
+                               }
+                       }
                        if qr.orderByTimestampDesc() {
                                qr.data[i].idx = len(qr.data[i].timestamps) - 1
                        }
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 80d7145f..10774efc 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -201,6 +201,22 @@ func (t *topNStreamingProcessor) writeData(publisher 
queue.BatchPublisher, event
                                                                        },
                                                                },
                                                        },
+                                                       // SortDirection
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Int{
+                                                                       Int: 
&modelv1.Int{
+                                                                               
Value: int64(t.sortDirection),
+                                                                       },
+                                                               },
+                                                       },
+                                                       // RankNumber
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Int{
+                                                                       Int: 
&modelv1.Int{
+                                                                               
Value: int64(rankNum),
+                                                                       },
+                                                               },
+                                                       },
                                                }, 
data[0].([]*modelv1.TagValue)...),
                                        },
                                },
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index 4ba62d6b..e7f16f19 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -127,6 +127,11 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
        tagFamilies := make([]nameValues, len(stm.schema.TagFamilies))
        dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, 
tagFamilies)
+       entityMap := make(map[string]bool)
+
+       for _, entity := range stm.GetSchema().GetEntity().GetTagNames() {
+               entityMap[entity] = true
+       }
        for i := range stm.GetSchema().GetTagFamilies() {
                var tagFamily *modelv1.TagFamilyForWrite
                if len(req.DataPoint.TagFamilies) <= i {
@@ -189,6 +194,16 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
                }
        }
 
+       for i := range tagFamilies {
+               values := make([]*nameValue, len(tagFamilies[i].values))
+               copy(values, tagFamilies[i].values)
+               tagFamilies[i].values = tagFamilies[i].values[:0]
+               for j, tagValue := range values {
+                       if !entityMap[tagValue.name] {
+                               tagFamilies[i].values = 
append(tagFamilies[i].values, values[j])
+                       }
+               }
+       }
        dpg.docs = append(dpg.docs, index.Document{
                DocID:        uint64(series.ID),
                EntityValues: series.Buffer,
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 84f6ee6c..dc40753e 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -90,14 +90,15 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                return
        }
 
+       sourceMeasureSchema := sourceMeasure.GetSchema()
        sourceMeasure.SetSchema(schema)
-       s, err := logical_measure.BuildTopNSchema(schema, 
topNSchema.GetGroupByTagNames())
+       s, err := logical_measure.BuildTopNSchema(schema)
        if err != nil {
                t.log.Error().Err(err).
                        Str("topN", topNMetadata.GetName()).
                        Msg("fail to build schema")
        }
-       plan, err := logical_measure.TopNAnalyze(context.TODO(), request, 
schema, topNSchema, s)
+       plan, err := logical_measure.TopNAnalyze(context.TODO(), request, 
schema, sourceMeasureSchema, topNSchema, s)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for topn %s: %v", topNMetadata.GetName(), err))
                return
@@ -118,6 +119,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                }
        }()
 
+       sourceMeasure.SetSchema(sourceMeasureSchema)
        result := make([]*measurev1.DataPoint, 0)
        for mIterator.Next() {
                current := mIterator.Current()
diff --git a/pkg/query/logical/measure/topn_analyzer.go 
b/pkg/query/logical/measure/topn_analyzer.go
index 2def6f73..4a6e05e6 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -24,17 +24,18 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 // BuildTopNSchema returns Schema loaded from the metadata repository.
-func BuildTopNSchema(md *databasev1.Measure, entityList []string) 
(logical.Schema, error) {
+func BuildTopNSchema(md *databasev1.Measure) (logical.Schema, error) {
        md.GetEntity()
 
        ms := &schema{
                common: &logical.CommonSchema{
                        TagSpecMap: make(map[string]*logical.TagSpec),
-                       EntityList: entityList,
+                       EntityList: md.GetEntity().GetTagNames(),
                },
                measure:  md,
                fieldMap: make(map[string]*logical.FieldSpec),
@@ -51,9 +52,9 @@ func BuildTopNSchema(md *databasev1.Measure, entityList 
[]string) (logical.Schem
 
 // TopNAnalyze converts logical expressions to executable operation tree 
represented by Plan.
 func TopNAnalyze(_ context.Context, criteria *measurev1.TopNRequest, schema 
*databasev1.Measure,
-       topNAggregation *databasev1.TopNAggregation, s logical.Schema,
+       sourceMeasureSchema *databasev1.Measure, topNAggregation 
*databasev1.TopNAggregation, s logical.Schema,
 ) (logical.Plan, error) {
-       groupByProjectionTags := schema.GetEntity().GetTagNames()
+       groupByProjectionTags := sourceMeasureSchema.GetEntity().GetTagNames()
        groupByTags := make([][]*logical.Tag, len(schema.GetTagFamilies()))
        tagFamily := schema.GetTagFamilies()[0]
        groupByTags[0] = logical.NewTags(tagFamily.GetName(), 
groupByProjectionTags...)
@@ -90,5 +91,21 @@ func parse(criteria *measurev1.TopNRequest, metadata 
*commonv1.Metadata,
 ) logical.UnresolvedPlan {
        timeRange := criteria.GetTimeRange()
        return local(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(),
-               metadata, projTags, projFields, criteria.GetConditions(), 
criteria.GetFieldValueSort())
+               metadata, projTags, projFields, buildConditions(criteria), 
criteria.GetFieldValueSort())
+}
+
+func buildConditions(criteria *measurev1.TopNRequest) []*modelv1.Condition {
+       return append([]*modelv1.Condition{
+               {
+                       Name: "sortDirection",
+                       Op:   modelv1.Condition_BINARY_OP_EQ,
+                       Value: &modelv1.TagValue{
+                               Value: &modelv1.TagValue_Int{
+                                       Int: &modelv1.Int{
+                                               Value: 
int64(criteria.GetFieldValueSort().Number()),
+                                       },
+                               },
+                       },
+               },
+       }, criteria.GetConditions()...)
 }
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go 
b/pkg/query/logical/measure/topn_plan_localscan.go
index f1c48d1d..46a4b56d 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -96,21 +96,11 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) 
(logical.Plan, error)
 
 func (uls *unresolvedLocalScan) locateEntity(entityList []string) 
([]*modelv1.TagValue, error) {
        entityMap := make(map[string]int)
-       entity := make([]*modelv1.TagValue, 1+1+len(entityList))
-       // sortDirection
-       entity[0] = &modelv1.TagValue{
-               Value: &modelv1.TagValue_Int{
-                       Int: &modelv1.Int{
-                               Value: int64(uls.sort.Number()),
-                       },
-               },
-       }
-       // rankNumber
-       entity[1] = pbv1.AnyTagValue
+       entity := make([]*modelv1.TagValue, len(entityList))
        for idx, tagName := range entityList {
-               entityMap[tagName] = idx + 2
+               entityMap[tagName] = idx
                // allow to make fuzzy search with partial conditions
-               entity[idx+2] = pbv1.AnyTagValue
+               entity[idx] = pbv1.AnyTagValue
        }
        for _, pairQuery := range uls.conditions {
                if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {

Reply via email to