This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a69048c4 Fix TopN processor not handling data points with
specifications correctly. (#931)
a69048c4 is described below
commit a69048c4ff85f437002edacf83918c8f8ba5951a
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jan 9 09:25:15 2026 +0800
Fix TopN processor not handling data points with specifications correctly.
(#931)
---
banyand/measure/svc_liaison.go | 10 -
banyand/measure/topn.go | 227 +++++++----
banyand/measure/topn_test.go | 726 ++++++++++++++++++++++++++++++++++++
banyand/measure/write_liaison.go | 2 +-
banyand/measure/write_standalone.go | 2 +-
5 files changed, 884 insertions(+), 83 deletions(-)
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index e9587831..453e087e 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -29,8 +29,6 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -166,14 +164,6 @@ func (s *liaison) GracefulStop() {
s.schemaRepo.Close()
}
-func (s *liaison) InFlow(stm *databasev1.Measure, seriesID uint64, shardID
uint32, entityValues []*modelv1.TagValue, dp *measurev1.DataPointValue) {
- if s.schemaRepo == nil {
- s.l.Error().Msg("schema repository is not initialized")
- return
- }
- s.schemaRepo.inFlow(stm, seriesID, shardID, entityValues, dp)
-}
-
// NewLiaison creates a new measure liaison service with the given
dependencies.
func NewLiaison(metadata metadata.Repo, pipeline queue.Server, omr
observability.MetricsRegistry, pm protector.Memory,
dataNodeSelector node.Selector, tire2Client queue.Client,
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index f4a62abf..d595b105 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -47,7 +47,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
@@ -66,13 +65,21 @@ var (
_ flow.Sink = (*topNStreamingProcessor)(nil)
)
-func (sr *schemaRepo) inFlow(stm *databasev1.Measure, seriesID uint64, shardID
uint32, entityValues []*modelv1.TagValue, dp *measurev1.DataPointValue) {
+func (sr *schemaRepo) inFlow(
+ stm *databasev1.Measure,
+ seriesID uint64,
+ shardID uint32,
+ entityValues []*modelv1.TagValue,
+ dp *measurev1.DataPointValue,
+ spec *measurev1.DataPointSpec,
+) {
if p, _ := sr.topNProcessorMap.Load(getKey(stm.GetMetadata())); p !=
nil {
p.(*topNProcessorManager).onMeasureWrite(seriesID, shardID,
&measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
- Metadata: stm.GetMetadata(),
- DataPoint: dp,
- MessageId: uint64(time.Now().UnixNano()),
+ Metadata: stm.GetMetadata(),
+ DataPoint: dp,
+ MessageId: uint64(time.Now().UnixNano()),
+ DataPointSpec: spec,
},
EntityValues: entityValues,
}, stm)
@@ -136,12 +143,129 @@ func (sr *schemaRepo) stopSteamingManager(sourceMeasure
*commonv1.Metadata) {
}
type dataPointWithEntityValues struct {
+ tagSpec logical.TagSpecRegistry
*measurev1.DataPointValue
+ fieldIndex map[string]int
entityValues []*modelv1.TagValue
seriesID uint64
shardID uint32
}
+func newDataPointWithEntityValues(
+ dp *measurev1.DataPointValue,
+ entityValues []*modelv1.TagValue,
+ seriesID uint64,
+ shardID uint32,
+ spec *measurev1.DataPointSpec,
+ m *databasev1.Measure,
+) *dataPointWithEntityValues {
+ fieldIndex := buildFieldIndex(spec, m)
+ tagSpec := buildTagSpecRegistryFromSpec(spec, m)
+ return &dataPointWithEntityValues{
+ DataPointValue: dp,
+ entityValues: entityValues,
+ seriesID: seriesID,
+ shardID: shardID,
+ tagSpec: tagSpec,
+ fieldIndex: fieldIndex,
+ }
+}
+
+func buildFieldIndex(spec *measurev1.DataPointSpec, m *databasev1.Measure)
map[string]int {
+ if spec != nil {
+ fieldIndex := make(map[string]int, len(spec.GetFieldNames()))
+ for i, fieldName := range spec.GetFieldNames() {
+ fieldIndex[fieldName] = i
+ }
+ return fieldIndex
+ }
+ if m != nil {
+ fieldIndex := make(map[string]int, len(m.GetFields()))
+ for i, fieldSpec := range m.GetFields() {
+ fieldIndex[fieldSpec.GetName()] = i
+ }
+ return fieldIndex
+ }
+ return make(map[string]int)
+}
+
+func buildTagSpecRegistryFromSpec(spec *measurev1.DataPointSpec, m
*databasev1.Measure) logical.TagSpecRegistry {
+ tagSpecMap := logical.TagSpecMap{}
+ if spec != nil {
+ for specFamilyIdx, specFamily := range spec.GetTagFamilySpec() {
+ for specTagIdx, tagName := range
specFamily.GetTagNames() {
+ tagSpec := &databasev1.TagSpec{
+ Name: tagName,
+ }
+ tagSpecMap.RegisterTag(specFamilyIdx,
specTagIdx, tagSpec)
+ }
+ }
+ return tagSpecMap
+ }
+ if m != nil {
+ for specFamilyIdx, tagFamily := range m.GetTagFamilies() {
+ for specTagIdx, tagSpec := range tagFamily.GetTags() {
+ tagSpecMap.RegisterTag(specFamilyIdx,
specTagIdx, tagSpec)
+ }
+ }
+ return tagSpecMap
+ }
+ return tagSpecMap
+}
+
+func (dp *dataPointWithEntityValues) tagValue(tagName string)
*modelv1.TagValue {
+ if familyIdx, tagIdx, ok := dp.locateSpecTag(tagName); ok {
+ if familyIdx < len(dp.GetTagFamilies()) {
+ tagFamily := dp.GetTagFamilies()[familyIdx]
+ if tagIdx < len(tagFamily.GetTags()) {
+ return tagFamily.GetTags()[tagIdx]
+ }
+ }
+ return pbv1.NullTagValue
+ }
+ return pbv1.NullTagValue
+}
+
+func (dp *dataPointWithEntityValues) locateSpecTag(tagName string) (int, int,
bool) {
+ if dp.tagSpec == nil {
+ return 0, 0, false
+ }
+ tagSpecFound := dp.tagSpec.FindTagSpecByName(tagName)
+ if tagSpecFound == nil {
+ return 0, 0, false
+ }
+ familyIdx := tagSpecFound.TagFamilyIdx
+ tagIdx := tagSpecFound.TagIdx
+ if familyIdx < 0 || tagIdx < 0 {
+ return 0, 0, false
+ }
+ return familyIdx, tagIdx, true
+}
+
+func (dp *dataPointWithEntityValues) intFieldValue(fieldName string, l
*logger.Logger) int64 {
+ if dp.fieldIndex == nil {
+ return 0
+ }
+ fieldIdx, ok := dp.fieldIndex[fieldName]
+ if !ok {
+ return 0
+ }
+ if fieldIdx >= len(dp.GetFields()) {
+ if l != nil {
+ l.Warn().Str("fieldName", fieldName).
+ Int("len", len(dp.GetFields())).
+ Int("fieldIdx", fieldIdx).
+ Msg("field index out of range")
+ }
+ return 0
+ }
+ field := dp.GetFields()[fieldIdx]
+ if field.GetInt() == nil {
+ return 0
+ }
+ return field.GetInt().GetValue()
+}
+
type topNStreamingProcessor struct {
pipeline queue.Client
streamingFlow flow.Flow
@@ -350,7 +474,6 @@ func (t *topNStreamingProcessor) handleError() {
// topNProcessorManager manages multiple topNStreamingProcessor(s) belonging
to a single measure.
type topNProcessorManager struct {
pipeline queue.Client
- s logical.TagSpecRegistry
l *logger.Logger
m *databasev1.Measure
nodeID string
@@ -370,9 +493,6 @@ func (manager *topNProcessorManager) init(m
*databasev1.Measure) {
return
}
manager.m = m
- tagMapSpec := logical.TagSpecMap{}
- tagMapSpec.RegisterTagFamilies(m.GetTagFamilies())
- manager.s = tagMapSpec
for i := range manager.registeredTasks {
if err := manager.start(manager.registeredTasks[i]); err != nil
{
manager.l.Err(err).Msg("fail to start processor")
@@ -393,7 +513,6 @@ func (manager *topNProcessorManager) Close() error {
}
manager.processorList = nil
manager.registeredTasks = nil
- manager.s = nil
manager.m = nil
return err
}
@@ -410,13 +529,18 @@ func (manager *topNProcessorManager)
onMeasureWrite(seriesID uint64, shardID uin
manager.init(measure)
manager.RLock()
}
+ dp := request.GetRequest().GetDataPoint()
+ spec := request.GetRequest().GetDataPointSpec()
for _, processor := range manager.processorList {
- processor.src <-
flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
- request.GetRequest().GetDataPoint(),
+ dpWithEntity := newDataPointWithEntityValues(
+ dp,
request.GetEntityValues(),
seriesID,
shardID,
- }, request.GetRequest().GetDataPoint().GetTimestamp())
+ spec,
+ manager.m,
+ )
+ processor.src <-
flow.NewStreamRecordWithTimestampPb(dpWithEntity, dp.GetTimestamp())
}
}()
}
@@ -537,7 +661,8 @@ func (manager *topNProcessorManager) buildFilter(criteria
*modelv1.Criteria) (fl
return func(_ context.Context, request any) bool {
tffws := request.(*dataPointWithEntityValues).GetTagFamilies()
- ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws),
manager.s)
+ tagSpec := request.(*dataPointWithEntityValues).tagSpec
+ ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws),
tagSpec)
if matchErr != nil {
manager.l.Err(matchErr).Msg("fail to match criteria")
return false
@@ -558,30 +683,24 @@ func (manager *topNProcessorManager)
buildMapper(fieldName string, groupByNames
if len(groupByNames) == 0 {
return func(_ context.Context, request any) any {
dpWithEvs := request.(*dataPointWithEntityValues)
- if len(dpWithEvs.GetFields()) <= fieldIdx {
- manager.l.Warn().Interface("point",
dpWithEvs.DataPointValue).
- Str("fieldName", fieldName).
- Int("len", len(dpWithEvs.GetFields())).
- Int("fieldIdx", fieldIdx).
- Msg("out of range")
- }
return flow.Data{
- // EntityValues as identity
dpWithEvs.entityValues,
- // save string representation of group values
as the key, i.e. v1
"",
- // field value as v2
-
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
- // shardID values as v3
+ dpWithEvs.intFieldValue(fieldName, manager.l),
dpWithEvs.shardID,
- // seriesID values as v4
dpWithEvs.seriesID,
}
}, nil
}
- groupLocator, removedTags := newGroupLocator(manager.m, groupByNames)
+ var removedTags []string
+ for i := range groupByNames {
+ _, _, tagSpec := pbv1.FindTagByName(manager.m.GetTagFamilies(),
groupByNames[i])
+ if tagSpec == nil {
+ removedTags = append(removedTags, groupByNames[i])
+ }
+ }
if len(removedTags) > 0 {
- if len(groupLocator) == 0 {
+ if len(removedTags) == len(groupByNames) {
manager.l.Warn().Strs("removedTags",
removedTags).Str("measure", manager.m.Metadata.GetName()).
Msg("TopNAggregation references removed tags
which no longer exist in schema, all groupBy tags were removed")
return nil, fmt.Errorf("all groupBy tags [%s] no longer
exist in %s schema, TopNAggregation is invalid",
@@ -592,55 +711,21 @@ func (manager *topNProcessorManager)
buildMapper(fieldName string, groupByNames
}
return func(_ context.Context, request any) any {
dpWithEvs := request.(*dataPointWithEntityValues)
+ groupValues := make([]string, 0, len(groupByNames))
+ for i := range groupByNames {
+ tagValue := dpWithEvs.tagValue(groupByNames[i])
+ groupValues = append(groupValues, Stringify(tagValue))
+ }
return flow.Data{
- // EntityValues as identity
dpWithEvs.entityValues,
- // save string representation of group values as the
key, i.e. v1
- GroupName(transform(groupLocator, func(locator
partition.TagLocator) string {
- return
Stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
- })),
- // field value as v2
- dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
- // shardID values as v3
+ GroupName(groupValues),
+ dpWithEvs.intFieldValue(fieldName, manager.l),
dpWithEvs.shardID,
- // seriesID values as v4
dpWithEvs.seriesID,
}
}, nil
}
-// groupTagsLocator can be used to locate tags within families.
-type groupTagsLocator []partition.TagLocator
-
-// newGroupLocator generates a groupTagsLocator which strictly preserve the
order of groupByNames.
-func newGroupLocator(m *databasev1.Measure, groupByNames []string)
(groupTagsLocator, []string) {
- groupTags := make([]partition.TagLocator, 0, len(groupByNames))
- var removedTags []string
- for _, groupByName := range groupByNames {
- fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(),
groupByName)
- if spec == nil {
- removedTags = append(removedTags, groupByName)
- continue
- }
- groupTags = append(groupTags, partition.TagLocator{
- FamilyOffset: fIdx,
- TagOffset: tIdx,
- })
- }
- return groupTags, removedTags
-}
-
-func extractTagValue(dpv *measurev1.DataPointValue, locator
partition.TagLocator) *modelv1.TagValue {
- if locator.FamilyOffset >= len(dpv.GetTagFamilies()) {
- return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
- }
- tagFamily := dpv.GetTagFamilies()[locator.FamilyOffset]
- if locator.TagOffset >= len(tagFamily.GetTags()) {
- return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
- }
- return tagFamily.GetTags()[locator.TagOffset]
-}
-
// Stringify converts a TagValue to a string.
func Stringify(tagValue *modelv1.TagValue) string {
switch v := tagValue.GetValue().(type) {
@@ -741,7 +826,7 @@ func (t *TopNValue) resizeEntityValues(size int) [][]byte {
return t.entityValues
}
-func (t *TopNValue) resizeEntities(size int, entitySize int)
[][]*modelv1.TagValue {
+func (t *TopNValue) resizeEntities(size, entitySize int) [][]*modelv1.TagValue
{
entities := t.entities
if n := size - cap(t.entities); n > 0 {
entities = append(entities[:cap(entities)],
make([][]*modelv1.TagValue, n)...)
diff --git a/banyand/measure/topn_test.go b/banyand/measure/topn_test.go
index 385d39c5..95c0deaf 100644
--- a/banyand/measure/topn_test.go
+++ b/banyand/measure/topn_test.go
@@ -18,14 +18,20 @@
package measure
import (
+ "encoding/base64"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
+ "google.golang.org/protobuf/types/known/timestamppb"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
)
func TestTopNValue_MarshalUnmarshal(t *testing.T) {
@@ -112,3 +118,723 @@ func TestTopNValue_MarshalUnmarshal(t *testing.T) {
})
}
}
+
+func TestTopNValue_Marshal_EmptyValues(t *testing.T) {
+ topNVal := &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1"},
+ values: []int64{},
+ entities: [][]*modelv1.TagValue{},
+ }
+ _, err := topNVal.marshal(nil)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "values is empty")
+}
+
+func TestTopNValue_Unmarshal_InvalidData(t *testing.T) {
+ decoder := generateColumnValuesDecoder()
+ defer releaseColumnValuesDecoder(decoder)
+
+ tests := []struct {
+ name string
+ wantErr string
+ src []byte
+ }{
+ {
+ name: "empty src",
+ src: []byte{},
+ wantErr: "cannot unmarshal topNValue",
+ },
+ {
+ name: "truncated after name",
+ src: []byte{0x01, 0x01, 'a'},
+ wantErr: "cannot unmarshal topNValue",
+ },
+ {
+ name: "truncated after encodeType",
+ src: []byte{0x01, 0x01, 'a', 0x00, 0x00},
+ wantErr: "cannot unmarshal topNValue",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ topNVal := &TopNValue{}
+ err := topNVal.Unmarshal(tt.src, decoder)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), tt.wantErr)
+ })
+ }
+}
+
+func TestTopNValue_SetMetadata(t *testing.T) {
+ topNVal := &TopNValue{}
+ topNVal.setMetadata("testValue", []string{"tag1", "tag2"})
+ require.Equal(t, "testValue", topNVal.valueName)
+ require.Equal(t, []string{"tag1", "tag2"}, topNVal.entityTagNames)
+}
+
+func TestTopNValue_AddValue(t *testing.T) {
+ topNVal := &TopNValue{}
+ entityValues := []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"svc1"}}},
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity1"}}},
+ }
+ topNVal.addValue(100, entityValues)
+ require.Equal(t, []int64{100}, topNVal.values)
+ require.Len(t, topNVal.entities, 1)
+ require.Len(t, topNVal.entities[0], 2)
+ require.Equal(t, entityValues, topNVal.entities[0], "entityValues
should be copied and equal")
+ require.NotSame(t, &entityValues[0], &topNVal.entities[0][0],
"entityValues should be a copy, not the same slice")
+}
+
+func TestTopNValue_Values(t *testing.T) {
+ topNVal := &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1", "tag2"},
+ values: []int64{1, 2, 3},
+ entities: [][]*modelv1.TagValue{
+ {
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "svc1"}}},
+ },
+ },
+ }
+ valueName, entityTagNames, values, entities := topNVal.Values()
+ require.Equal(t, "testValue", valueName)
+ require.Equal(t, []string{"tag1", "tag2"}, entityTagNames)
+ require.Equal(t, []int64{1, 2, 3}, values)
+ require.Equal(t, topNVal.entities, entities)
+}
+
+func TestTopNValue_Reset(t *testing.T) {
+ topNVal := &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1", "tag2"},
+ values: []int64{1, 2, 3},
+ entities: [][]*modelv1.TagValue{
+ {
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "svc1"}}},
+ },
+ },
+ buf: []byte{1, 2, 3},
+ entityValues: [][]byte{{1, 2}},
+ entityValuesBuf: [][]byte{{3, 4}},
+ }
+ topNVal.Reset()
+ require.Empty(t, topNVal.valueName)
+ require.Empty(t, topNVal.entityTagNames)
+ require.Empty(t, topNVal.values)
+ require.Empty(t, topNVal.entities)
+ require.Empty(t, topNVal.buf)
+ require.Empty(t, topNVal.entityValues)
+ require.Empty(t, topNVal.entityValuesBuf)
+ require.Equal(t, int64(0), topNVal.firstValue)
+}
+
+func TestTopNValue_ResizeEntityValues(t *testing.T) {
+ topNVal := &TopNValue{}
+ result := topNVal.resizeEntityValues(5)
+ require.Len(t, result, 5)
+ require.Len(t, topNVal.entityValues, 5)
+ result2 := topNVal.resizeEntityValues(3)
+ require.Len(t, result2, 3)
+ require.Len(t, topNVal.entityValues, 3)
+ result3 := topNVal.resizeEntityValues(10)
+ require.Len(t, result3, 10)
+ require.Len(t, topNVal.entityValues, 10)
+}
+
+func TestTopNValue_ResizeEntities(t *testing.T) {
+ topNVal := &TopNValue{}
+ result := topNVal.resizeEntities(3, 2)
+ require.Len(t, result, 3)
+ require.Len(t, topNVal.entities, 3)
+ for i := range result {
+ require.Len(t, result[i], 0, "entities should be reset to
length 0")
+ require.Equal(t, 2, cap(result[i]), "entities should have
capacity 2")
+ }
+}
+
+func TestDataPointWithEntityValues_IntFieldValue(t *testing.T) {
+ tests := []struct {
+ fieldIndex map[string]int
+ name string
+ fieldName string
+ fields []*modelv1.FieldValue
+ expectedValue int64
+ }{
+ {
+ name: "field exists",
+ fieldIndex: map[string]int{"field1": 0},
+ fields: []*modelv1.FieldValue{{Value:
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+ fieldName: "field1",
+ expectedValue: 100,
+ },
+ {
+ name: "field not in index",
+ fieldIndex: map[string]int{"field1": 0},
+ fields: []*modelv1.FieldValue{{Value:
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+ fieldName: "field2",
+ expectedValue: 0,
+ },
+ {
+ name: "field index out of range",
+ fieldIndex: map[string]int{"field1": 5},
+ fields: []*modelv1.FieldValue{{Value:
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+ fieldName: "field1",
+ expectedValue: 0,
+ },
+ {
+ name: "field is not int type",
+ fieldIndex: map[string]int{"field1": 0},
+ fields: []*modelv1.FieldValue{{Value:
&modelv1.FieldValue_Float{Float: &modelv1.Float{Value: 3.14}}}},
+ fieldName: "field1",
+ expectedValue: 0,
+ },
+ {
+ name: "nil fieldIndex",
+ fieldIndex: nil,
+ fields: []*modelv1.FieldValue{{Value:
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+ fieldName: "field1",
+ expectedValue: 0,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ dp := &dataPointWithEntityValues{
+ DataPointValue: &measurev1.DataPointValue{
+ Fields: tt.fields,
+ },
+ fieldIndex: tt.fieldIndex,
+ }
+ result := dp.intFieldValue(tt.fieldName, nil)
+ require.Equal(t, tt.expectedValue, result)
+ })
+ }
+}
+
+func TestDataPointWithEntityValues_TagValue(t *testing.T) {
+ tests := []struct {
+ tagSpec logical.TagSpecRegistry
+ name string
+ tagName string
+ tagFamilies []*modelv1.TagFamilyForWrite
+ expectedNull bool
+ }{
+ {
+ name: "tag found",
+ tagSpec: func() logical.TagSpecRegistry {
+ tagSpecMap := logical.TagSpecMap{}
+ tagSpecMap.RegisterTag(0, 0,
&databasev1.TagSpec{Name: "tag1"})
+ return tagSpecMap
+ }(),
+ tagFamilies: []*modelv1.TagFamilyForWrite{
+ {
+ Tags: []*modelv1.TagValue{
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "value1"}}},
+ },
+ },
+ },
+ tagName: "tag1",
+ expectedNull: false,
+ },
+ {
+ name: "tag not found in spec",
+ tagSpec: logical.TagSpecMap{},
+ tagFamilies: []*modelv1.TagFamilyForWrite{},
+ tagName: "tag1",
+ expectedNull: true,
+ },
+ {
+ name: "tag family index out of range",
+ tagSpec: func() logical.TagSpecRegistry {
+ tagSpecMap := logical.TagSpecMap{}
+ tagSpecMap.RegisterTag(5, 0,
&databasev1.TagSpec{Name: "tag1"})
+ return tagSpecMap
+ }(),
+ tagFamilies: []*modelv1.TagFamilyForWrite{
+ {
+ Tags: []*modelv1.TagValue{
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "value1"}}},
+ },
+ },
+ },
+ tagName: "tag1",
+ expectedNull: true,
+ },
+ {
+ name: "tag index out of range",
+ tagSpec: func() logical.TagSpecRegistry {
+ tagSpecMap := logical.TagSpecMap{}
+ tagSpecMap.RegisterTag(0, 5,
&databasev1.TagSpec{Name: "tag1"})
+ return tagSpecMap
+ }(),
+ tagFamilies: []*modelv1.TagFamilyForWrite{
+ {
+ Tags: []*modelv1.TagValue{
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "value1"}}},
+ },
+ },
+ },
+ tagName: "tag1",
+ expectedNull: true,
+ },
+ {
+ name: "nil tagSpec",
+ tagSpec: nil,
+ tagFamilies: []*modelv1.TagFamilyForWrite{},
+ tagName: "tag1",
+ expectedNull: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ dp := &dataPointWithEntityValues{
+ DataPointValue: &measurev1.DataPointValue{
+ TagFamilies: tt.tagFamilies,
+ },
+ tagSpec: tt.tagSpec,
+ }
+ result := dp.tagValue(tt.tagName)
+ if tt.expectedNull {
+ require.Equal(t, pbv1.NullTagValue, result)
+ } else {
+ require.NotEqual(t, pbv1.NullTagValue, result)
+ }
+ })
+ }
+}
+
+func TestDataPointWithEntityValues_LocateSpecTag(t *testing.T) {
+ tests := []struct {
+ name string
+ tagSpec logical.TagSpecRegistry
+ tagName string
+ wantFamily int
+ wantTag int
+ wantOk bool
+ }{
+ {
+ name: "tag found",
+ tagSpec: func() logical.TagSpecRegistry {
+ tagSpecMap := logical.TagSpecMap{}
+ tagSpecMap.RegisterTag(1, 2,
&databasev1.TagSpec{Name: "tag1"})
+ return tagSpecMap
+ }(),
+ tagName: "tag1",
+ wantFamily: 1,
+ wantTag: 2,
+ wantOk: true,
+ },
+ {
+ name: "tag not found",
+ tagSpec: logical.TagSpecMap{},
+ tagName: "tag1",
+ wantFamily: 0,
+ wantTag: 0,
+ wantOk: false,
+ },
+ {
+ name: "nil tagSpec",
+ tagSpec: nil,
+ tagName: "tag1",
+ wantFamily: 0,
+ wantTag: 0,
+ wantOk: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ dp := &dataPointWithEntityValues{
+ tagSpec: tt.tagSpec,
+ }
+ familyIdx, tagIdx, ok := dp.locateSpecTag(tt.tagName)
+ require.Equal(t, tt.wantOk, ok)
+ if ok {
+ require.Equal(t, tt.wantFamily, familyIdx)
+ require.Equal(t, tt.wantTag, tagIdx)
+ }
+ })
+ }
+}
+
+func TestStringify(t *testing.T) {
+ tests := []struct {
+ name string
+ tagValue *modelv1.TagValue
+ expected string
+ }{
+ {
+ name: "string value",
+ tagValue: &modelv1.TagValue{Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "test"}}},
+ expected: "test",
+ },
+ {
+ name: "int value",
+ tagValue: &modelv1.TagValue{Value:
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 12345}}},
+ expected: "12345",
+ },
+ {
+ name: "binary data",
+ tagValue: &modelv1.TagValue{Value:
&modelv1.TagValue_BinaryData{BinaryData: []byte("test data")}},
+ expected:
base64.StdEncoding.EncodeToString([]byte("test data")),
+ },
+ {
+ name: "int array",
+ tagValue: &modelv1.TagValue{Value:
&modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: []int64{1, 2,
3}}}},
+ expected: "1,2,3",
+ },
+ {
+ name: "string array",
+ tagValue: &modelv1.TagValue{Value:
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{"a",
"b", "c"}}}},
+ expected: "a,b,c",
+ },
+ {
+ name: "empty string array",
+ tagValue: &modelv1.TagValue{Value:
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{}}}},
+ expected: "",
+ },
+ {
+ name: "unknown type",
+ tagValue: &modelv1.TagValue{Value:
&modelv1.TagValue_Null{}},
+ expected: "",
+ },
+ {
+ name: "nil value",
+ tagValue: &modelv1.TagValue{},
+ expected: "",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := Stringify(tt.tagValue)
+ require.Equal(t, tt.expected, result)
+ })
+ }
+}
+
+func TestGroupName(t *testing.T) {
+ tests := []struct {
+ name string
+ expected string
+ groupTags []string
+ }{
+ {
+ name: "single tag",
+ groupTags: []string{"tag1"},
+ expected: "tag1",
+ },
+ {
+ name: "multiple tags",
+ groupTags: []string{"tag1", "tag2", "tag3"},
+ expected: "tag1|tag2|tag3",
+ },
+ {
+ name: "empty tags",
+ groupTags: []string{},
+ expected: "",
+ },
+ {
+ name: "nil tags",
+ groupTags: nil,
+ expected: "",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := GroupName(tt.groupTags)
+ require.Equal(t, tt.expected, result)
+ })
+ }
+}
+
+func TestBuildFieldIndex(t *testing.T) {
+ tests := []struct {
+ spec *measurev1.DataPointSpec
+ expected map[string]int
+ name string
+ }{
+ {
+ name: "multiple fields",
+ spec: &measurev1.DataPointSpec{
+ FieldNames: []string{"field1", "field2",
"field3"},
+ },
+ expected: map[string]int{
+ "field1": 0,
+ "field2": 1,
+ "field3": 2,
+ },
+ },
+ {
+ name: "empty fields",
+ spec: &measurev1.DataPointSpec{FieldNames:
[]string{}},
+ expected: map[string]int{},
+ },
+ {
+ name: "nil spec",
+ spec: nil,
+ expected: map[string]int{},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := buildFieldIndex(tt.spec, nil)
+ require.Equal(t, tt.expected, result)
+ })
+ }
+}
+
+func TestBuildTagSpecRegistryFromSpec(t *testing.T) {
+ tests := []struct {
+ name string
+ spec *measurev1.DataPointSpec
+ tagName string
+ wantOk bool
+ wantFam int
+ wantTag int
+ }{
+ {
+ name: "single tag family",
+ spec: &measurev1.DataPointSpec{
+ TagFamilySpec: []*measurev1.TagFamilySpec{
+ {
+ TagNames: []string{"tag1",
"tag2"},
+ },
+ },
+ },
+ tagName: "tag1",
+ wantOk: true,
+ wantFam: 0,
+ wantTag: 0,
+ },
+ {
+ name: "multiple tag families",
+ spec: &measurev1.DataPointSpec{
+ TagFamilySpec: []*measurev1.TagFamilySpec{
+ {
+ TagNames: []string{"tag1"},
+ },
+ {
+ TagNames: []string{"tag2",
"tag3"},
+ },
+ },
+ },
+ tagName: "tag3",
+ wantOk: true,
+ wantFam: 1,
+ wantTag: 1,
+ },
+ {
+ name: "nil spec",
+ spec: nil,
+ tagName: "tag1",
+ wantOk: false,
+ wantFam: 0,
+ wantTag: 0,
+ },
+ {
+ name: "tag not found",
+ spec: &measurev1.DataPointSpec{
+ TagFamilySpec: []*measurev1.TagFamilySpec{
+ {
+ TagNames: []string{"tag1"},
+ },
+ },
+ },
+ tagName: "tag2",
+ wantOk: false,
+ wantFam: 0,
+ wantTag: 0,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := buildTagSpecRegistryFromSpec(tt.spec, nil)
+ if tt.spec == nil {
+ require.NotNil(t, result)
+ return
+ }
+ tagSpec := result.FindTagSpecByName(tt.tagName)
+ if tt.wantOk {
+ require.NotNil(t, tagSpec)
+ require.Equal(t, tt.wantFam,
tagSpec.TagFamilyIdx)
+ require.Equal(t, tt.wantTag, tagSpec.TagIdx)
+ } else {
+ require.Nil(t, tagSpec)
+ }
+ })
+ }
+}
+
+func TestTopNValue_MarshalUnmarshal_EdgeCases(t *testing.T) {
+ decoder := generateColumnValuesDecoder()
+ defer releaseColumnValuesDecoder(decoder)
+
+ tests := []struct {
+ topNVal *TopNValue
+ name string
+ }{
+ {
+ name: "empty entityTagNames",
+ topNVal: &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{},
+ values: []int64{1, 2, 3},
+ entities: [][]*modelv1.TagValue{
+ {},
+ {},
+ {},
+ },
+ },
+ },
+ {
+ name: "large values",
+ topNVal: &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1"},
+ values: []int64{-9223372036854775808,
9223372036854775807, 0},
+ entities: [][]*modelv1.TagValue{
+ {{Value: &modelv1.TagValue_Int{Int:
&modelv1.Int{Value: 1}}}},
+ {{Value: &modelv1.TagValue_Int{Int:
&modelv1.Int{Value: 2}}}},
+ {{Value: &modelv1.TagValue_Int{Int:
&modelv1.Int{Value: 3}}}},
+ },
+ },
+ },
+ {
+ name: "many entity tag names",
+ topNVal: &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1", "tag2",
"tag3", "tag4", "tag5"},
+ values: []int64{1},
+ entities: [][]*modelv1.TagValue{
+ {
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v1"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v2"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v3"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v4"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v5"}}},
+ },
+ },
+ },
+ },
+ {
+ name: "different tag value types",
+ topNVal: &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1", "tag2",
"tag3"},
+ values: []int64{1, 2},
+ entities: [][]*modelv1.TagValue{
+ {
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "str"}}},
+ {Value:
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 42}}},
+ {Value:
&modelv1.TagValue_BinaryData{BinaryData: []byte("binary")}},
+ },
+ {
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "str2"}}},
+ {Value:
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 43}}},
+ {Value:
&modelv1.TagValue_BinaryData{BinaryData: []byte("binary2")}},
+ },
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ originalValueName := tt.topNVal.valueName
+ originalEntityTagNames := make([]string,
len(tt.topNVal.entityTagNames))
+ copy(originalEntityTagNames, tt.topNVal.entityTagNames)
+ originalValues := make([]int64, len(tt.topNVal.values))
+ copy(originalValues, tt.topNVal.values)
+
+ originalEntities := make([][]*modelv1.TagValue,
len(tt.topNVal.entities))
+ for i, entityGroup := range tt.topNVal.entities {
+ originalEntities[i] = make([]*modelv1.TagValue,
len(entityGroup))
+ for j, tagValue := range entityGroup {
+ originalEntities[i][j] =
proto.Clone(tagValue).(*modelv1.TagValue)
+ }
+ }
+
+ dst, err := tt.topNVal.marshal(nil)
+ require.NoError(t, err)
+
+ tt.topNVal.Reset()
+ err = tt.topNVal.Unmarshal(dst, decoder)
+ require.NoError(t, err)
+
+ require.Equal(t, originalValueName,
tt.topNVal.valueName)
+ require.Equal(t, originalEntityTagNames,
tt.topNVal.entityTagNames)
+ require.Equal(t, originalValues, tt.topNVal.values)
+ diff := cmp.Diff(originalEntities, tt.topNVal.entities,
protocmp.Transform())
+ require.True(t, diff == "", "entities differ: %s", diff)
+ })
+ }
+}
+
+func TestTopNValue_Unmarshal_InvalidEntityLength(t *testing.T) {
+ decoder := generateColumnValuesDecoder()
+ defer releaseColumnValuesDecoder(decoder)
+
+ topNVal := &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1", "tag2"},
+ values: []int64{1},
+ entities: [][]*modelv1.TagValue{
+ {
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "svc1"}}},
+ },
+ },
+ }
+
+ dst, err := topNVal.marshal(nil)
+ require.NoError(t, err)
+
+ topNVal.Reset()
+ err = topNVal.Unmarshal(dst, decoder)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "entityValues")
+}
+
+func TestNewDataPointWithEntityValues(t *testing.T) {
+ dp := &measurev1.DataPointValue{
+ Timestamp: timestamppb.Now(),
+ Fields: []*modelv1.FieldValue{
+ {Value: &modelv1.FieldValue_Int{Int:
&modelv1.Int{Value: 100}}},
+ },
+ TagFamilies: []*modelv1.TagFamilyForWrite{
+ {
+ Tags: []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "tag1"}}},
+ },
+ },
+ },
+ }
+ entityValues := []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity1"}}},
+ }
+ spec := &measurev1.DataPointSpec{
+ FieldNames: []string{"field1"},
+ TagFamilySpec: []*measurev1.TagFamilySpec{
+ {
+ TagNames: []string{"tag1"},
+ },
+ },
+ }
+
+ result := newDataPointWithEntityValues(dp, entityValues, 123, 456,
spec, nil)
+ require.NotNil(t, result)
+ require.Equal(t, dp, result.DataPointValue)
+ require.Equal(t, entityValues, result.entityValues)
+ require.Equal(t, uint64(123), result.seriesID)
+ require.Equal(t, uint32(456), result.shardID)
+ require.NotNil(t, result.fieldIndex)
+ require.Equal(t, 0, result.fieldIndex["field1"])
+ require.NotNil(t, result.tagSpec)
+}
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 33cd94f5..4f855b86 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -238,6 +238,6 @@ func (w *writeQueueCallback) handle(dst
map[string]*dataPointsInQueue,
if err != nil {
return nil, err
}
- w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId,
writeEvent.EntityValues, req.DataPoint)
+ w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId,
writeEvent.EntityValues, req.DataPoint, spec)
return dst, nil
}
diff --git a/banyand/measure/write_standalone.go
b/banyand/measure/write_standalone.go
index 44872e3e..08c8b380 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -199,7 +199,7 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
if err != nil {
return nil, err
}
- w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId,
writeEvent.EntityValues, req.DataPoint)
+ w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId,
writeEvent.EntityValues, req.DataPoint, spec)
return dst, nil
}