This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 671cdb8f Implement spec-based bulk write for measure (#869)
671cdb8f is described below
commit 671cdb8f226690390d55e4298586d7a5caed5337
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Dec 5 08:40:57 2025 +0800
Implement spec-based bulk write for measure (#869)
* Implement spec-based bulk write for measure
---------
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
api/proto/banyandb/model/v1/write.proto | 1 +
banyand/liaison/grpc/discovery.go | 9 +-
banyand/liaison/grpc/measure.go | 214 +++++++++++++++++++--
banyand/liaison/grpc/stream.go | 4 +-
banyand/measure/write_benchmark_test.go | 2 +-
banyand/measure/write_liaison.go | 28 ++-
banyand/measure/write_standalone.go | 183 +++++++++++++-----
docs/api-reference.md | 1 +
test/cases/init.go | 38 ++++
test/cases/measure/data/data.go | 124 ++++++++++++
test/cases/measure/data/input/write_mixed.ql | 22 +++
test/cases/measure/data/input/write_mixed.yaml | 41 ++++
test/cases/measure/data/input/write_spec.ql | 22 +++
test/cases/measure/data/input/write_spec.yaml | 33 ++++
.../testdata/service_cpm_minute_schema_order.json | 93 +++++++++
.../testdata/service_cpm_minute_spec_order.json | 93 +++++++++
.../testdata/service_cpm_minute_spec_order2.json | 93 +++++++++
test/cases/measure/data/want/write_mixed.yaml | 139 +++++++++++++
test/cases/measure/data/want/write_spec.yaml | 59 ++++++
test/cases/measure/measure.go | 2 +
20 files changed, 1117 insertions(+), 84 deletions(-)
diff --git a/api/proto/banyandb/model/v1/write.proto
b/api/proto/banyandb/model/v1/write.proto
index 3f25e1ee..3c7a3504 100644
--- a/api/proto/banyandb/model/v1/write.proto
+++ b/api/proto/banyandb/model/v1/write.proto
@@ -33,4 +33,5 @@ enum Status {
STATUS_DISK_FULL = 6;
STATUS_VERSION_UNSUPPORTED = 7; // Client version not supported
STATUS_VERSION_DEPRECATED = 8; // Client version deprecated but still
supported
+ STATUS_METADATA_REQUIRED = 9; // Metadata is required for the first request
}
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index acb101bd..abeb9f99 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -83,7 +83,7 @@ func (ds *discoveryService) SetLogger(log *logger.Logger) {
ds.shardingKeyRepo.log = log
}
-func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies
[]*modelv1.TagFamilyForWrite) (pbv1.EntityValues, common.ShardID, error) {
+func (ds *discoveryService) navigateByLocator(metadata *commonv1.Metadata,
tagFamilies []*modelv1.TagFamilyForWrite) (pbv1.EntityValues, common.ShardID,
error) {
shardNum, existed := ds.groupRepo.shardNum(metadata.Group)
if !existed {
return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
@@ -335,6 +335,13 @@ func (e *entityRepo) getTraceIDIndex(id identity) (int,
bool) {
return index, true
}
+func (e *entityRepo) getMeasure(id identity) (*databasev1.Measure, bool) {
+ e.RWMutex.RLock()
+ defer e.RWMutex.RUnlock()
+ m, ok := e.measureMap[id]
+ return m, ok
+}
+
var _ schema.EventHandler = (*shardingKeyRepo)(nil)
type shardingKeyRepo struct {
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 355968d0..001b6d02 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -36,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -83,6 +85,12 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
defer ms.handleWriteCleanup(publisher, &succeedSent, measure, start)
+ var metadata *commonv1.Metadata
+ var spec *measurev1.DataPointSpec
+ isFirstRequest := true
+ nodeMetadataSent := make(map[string]bool)
+ nodeSpecSent := make(map[string]bool)
+
for {
select {
case <-ctx.Done():
@@ -101,35 +109,52 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
return err
}
- ms.metrics.totalStreamMsgReceived.Inc(1,
writeRequest.Metadata.Group, "measure", "write")
+ if writeRequest.GetMetadata() != nil {
+ metadata = writeRequest.GetMetadata()
+ nodeMetadataSent = make(map[string]bool)
+ } else if isFirstRequest {
+ ms.l.Error().Msg("metadata is required for the first
request of gRPC stream")
+ ms.sendReply(nil,
modelv1.Status_STATUS_METADATA_REQUIRED, writeRequest.GetMessageId(), measure)
+ return errors.New("metadata is required for the first
request of gRPC stream")
+ }
+ isFirstRequest = false
+
+ if writeRequest.GetDataPointSpec() != nil {
+ spec = writeRequest.GetDataPointSpec()
+ nodeSpecSent = make(map[string]bool)
+ }
+
+ ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"measure", "write")
- if status := ms.validateWriteRequest(writeRequest, measure);
status != modelv1.Status_STATUS_SUCCEED {
+ if status := ms.validateWriteRequest(writeRequest, metadata,
measure); status != modelv1.Status_STATUS_SUCCEED {
continue
}
- if err := ms.processAndPublishRequest(ctx, writeRequest,
publisher, &succeedSent, measure); err != nil {
+ if err := ms.processAndPublishRequest(ctx, writeRequest,
metadata, spec, publisher, &succeedSent, measure, nodeMetadataSent,
nodeSpecSent); err != nil {
continue
}
}
}
-func (ms *measureService) validateWriteRequest(writeRequest
*measurev1.WriteRequest, measure measurev1.MeasureService_WriteServer)
modelv1.Status {
+func (ms *measureService) validateWriteRequest(writeRequest
*measurev1.WriteRequest,
+ metadata *commonv1.Metadata, measure
measurev1.MeasureService_WriteServer,
+) modelv1.Status {
if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp);
errTime != nil {
ms.l.Error().Err(errTime).Stringer("written",
writeRequest).Msg("the data point time is invalid")
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeRequest.GetMessageId(), measure)
return modelv1.Status_STATUS_INVALID_TIMESTAMP
}
- if writeRequest.Metadata.ModRevision > 0 {
- measureCache, existed :=
ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
+ if metadata.ModRevision > 0 {
+ measureCache, existed :=
ms.entityRepo.getLocator(getID(metadata))
if !existed {
ms.l.Error().Stringer("written",
writeRequest).Msg("failed to measure schema not found")
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeRequest.GetMessageId(), measure)
return modelv1.Status_STATUS_NOT_FOUND
}
- if writeRequest.Metadata.ModRevision !=
measureCache.ModRevision {
+ if metadata.ModRevision != measureCache.ModRevision {
ms.l.Error().Stringer("written", writeRequest).Msg("the
measure schema is expired")
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata,
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure)
return modelv1.Status_STATUS_EXPIRED_SCHEMA
}
}
@@ -138,7 +163,9 @@ func (ms *measureService) validateWriteRequest(writeRequest
*measurev1.WriteRequ
}
func (ms *measureService) processAndPublishRequest(ctx context.Context,
writeRequest *measurev1.WriteRequest,
- publisher queue.BatchPublisher, succeedSent *[]succeedSentMessage,
measure measurev1.MeasureService_WriteServer,
+ metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, publisher
queue.BatchPublisher,
+ succeedSent *[]succeedSentMessage, measure
measurev1.MeasureService_WriteServer,
+ nodeMetadataSent map[string]bool, nodeSpecSent map[string]bool,
) error {
// Retry with backoff when encountering errNotExist
var tagValues pbv1.EntityValues
@@ -149,7 +176,7 @@ func (ms *measureService) processAndPublishRequest(ctx
context.Context, writeReq
retryInterval := 10 * time.Millisecond
startTime := time.Now()
for {
- tagValues, shardID, err =
ms.navigate(writeRequest.GetMetadata(),
writeRequest.GetDataPoint().GetTagFamilies())
+ tagValues, shardID, err = ms.navigate(metadata,
writeRequest, spec)
if err == nil || !errors.Is(err, errNotExist) ||
time.Since(startTime) > ms.maxWaitDuration {
break
}
@@ -162,12 +189,12 @@ func (ms *measureService) processAndPublishRequest(ctx
context.Context, writeReq
}
}
} else {
- tagValues, shardID, err =
ms.navigate(writeRequest.GetMetadata(),
writeRequest.GetDataPoint().GetTagFamilies())
+ tagValues, shardID, err = ms.navigate(metadata, writeRequest,
spec)
}
if err != nil {
ms.l.Error().Err(err).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR,
writeRequest.GetMessageId(), measure)
return err
}
@@ -190,45 +217,190 @@ func (ms *measureService) processAndPublishRequest(ctx
context.Context, writeReq
EntityValues: tagValues[1:].Encode(),
}
- nodes, err := ms.publishToNodes(ctx, writeRequest, iwr, publisher,
uint32(shardID), measure)
+ nodes, err := ms.publishToNodes(ctx, writeRequest, metadata, spec, iwr,
publisher, uint32(shardID), measure, nodeMetadataSent, nodeSpecSent)
if err != nil {
return err
}
*succeedSent = append(*succeedSent, succeedSentMessage{
- metadata: writeRequest.GetMetadata(),
+ metadata: metadata,
messageID: writeRequest.GetMessageId(),
nodes: nodes,
})
return nil
}
-func (ms *measureService) publishToNodes(ctx context.Context, writeRequest
*measurev1.WriteRequest, iwr *measurev1.InternalWriteRequest,
+func (ms *measureService) publishToNodes(ctx context.Context, writeRequest
*measurev1.WriteRequest,
+ metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, iwr
*measurev1.InternalWriteRequest,
publisher queue.BatchPublisher, shardID uint32, measure
measurev1.MeasureService_WriteServer,
+ nodeMetadataSent map[string]bool, nodeSpecSent map[string]bool,
) ([]string, error) {
- nodeID, errPickNode :=
ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(),
writeRequest.GetMetadata().GetName(), shardID, 0)
+ nodeID, errPickNode := ms.nodeRegistry.Locate(metadata.GetGroup(),
metadata.GetName(), shardID, 0)
if errPickNode != nil {
ms.l.Error().Err(errPickNode).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to pick an available node")
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR,
writeRequest.GetMessageId(), measure)
return nil, errPickNode
}
+ if !nodeMetadataSent[nodeID] {
+ iwr.Request.Metadata = metadata
+ nodeMetadataSent[nodeID] = true
+ }
+ if spec != nil && !nodeSpecSent[nodeID] {
+ iwr.Request.DataPointSpec = spec
+ nodeSpecSent[nodeID] = true
+ }
+
message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite,
message)
if errWritePub != nil {
ms.l.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a
message")
var ce *common.Error
if errors.As(errWritePub, &ce) {
- ms.sendReply(writeRequest.GetMetadata(), ce.Status(),
writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, ce.Status(),
writeRequest.GetMessageId(), measure)
return nil, errWritePub
}
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR,
writeRequest.GetMessageId(), measure)
return nil, errWritePub
}
return []string{nodeID}, nil
}
+func (ms *measureService) navigate(metadata *commonv1.Metadata,
+ writeRequest *measurev1.WriteRequest, spec *measurev1.DataPointSpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+ tagFamilies := writeRequest.GetDataPoint().GetTagFamilies()
+ if spec == nil {
+ return ms.navigateByLocator(metadata, tagFamilies)
+ }
+ return ms.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (ms *measureService) navigateByTagSpec(
+ metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, tagFamilies
[]*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+ shardNum, existed := ms.groupRepo.shardNum(metadata.Group)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
+ }
+ id := getID(metadata)
+ measure, ok := ms.entityRepo.getMeasure(id)
+ if !ok {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding measure schema by: %v", metadata)
+ }
+ specFamilyMap, specTagMaps := ms.buildSpecMaps(spec)
+
+ entityValues := ms.findTagValuesByNames(
+ metadata.Name,
+ measure.GetTagFamilies(),
+ tagFamilies,
+ measure.GetEntity().GetTagNames(),
+ specFamilyMap,
+ specTagMaps,
+ )
+ entity, err := entityValues.ToEntity()
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+
+ shardingKey := measure.GetShardingKey()
+ if shardingKey != nil && len(shardingKey.GetTagNames()) > 0 {
+ shardingKeyValues := ms.findTagValuesByNames(
+ metadata.Name,
+ measure.GetTagFamilies(),
+ tagFamilies,
+ shardingKey.GetTagNames(),
+ specFamilyMap,
+ specTagMaps,
+ )
+ shardingEntity, shardingErr := shardingKeyValues.ToEntity()
+ if shardingErr != nil {
+ return nil, common.ShardID(0), shardingErr
+ }
+ shardID, shardingErr :=
partition.ShardID(shardingEntity.Marshal(), shardNum)
+ if shardingErr != nil {
+ return nil, common.ShardID(0), shardingErr
+ }
+ return entityValues, common.ShardID(shardID), nil
+ }
+
+ shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ return entityValues, common.ShardID(shardID), nil
+}
+
+func (ms *measureService) buildSpecMaps(spec *measurev1.DataPointSpec)
(map[string]int, map[string]map[string]int) {
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec.GetTagFamilySpec() {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
+ for j, tagName := range specFamily.GetTagNames() {
+ tagMap[tagName] = j
+ }
+ specTagMaps[specFamily.GetName()] = tagMap
+ }
+ return specFamilyMap, specTagMaps
+}
+
+func (ms *measureService) findTagValuesByNames(
+ subject string,
+ schemaFamilies []*databasev1.TagFamilySpec,
+ srcTagFamilies []*modelv1.TagFamilyForWrite,
+ tagNames []string,
+ specFamilyMap map[string]int,
+ specTagMaps map[string]map[string]int,
+) pbv1.EntityValues {
+ entityValues := make(pbv1.EntityValues, len(tagNames)+1)
+ entityValues[0] = pbv1.EntityStrValue(subject)
+ for i, tagName := range tagNames {
+ tagValue := ms.findTagValueByName(schemaFamilies,
srcTagFamilies, tagName, specFamilyMap, specTagMaps)
+ if tagValue == nil {
+ entityValues[i+1] = &modelv1.TagValue{Value:
&modelv1.TagValue_Null{}}
+ } else {
+ entityValues[i+1] = tagValue
+ }
+ }
+ return entityValues
+}
+
+func (ms *measureService) findTagValueByName(
+ schemaFamilies []*databasev1.TagFamilySpec,
+ srcTagFamilies []*modelv1.TagFamilyForWrite,
+ tagName string,
+ specFamilyMap map[string]int,
+ specTagMaps map[string]map[string]int,
+) *modelv1.TagValue {
+ for _, schemaFamily := range schemaFamilies {
+ for _, schemaTag := range schemaFamily.GetTags() {
+ if schemaTag.GetName() != tagName {
+ continue
+ }
+ familyIdx, ok := specFamilyMap[schemaFamily.GetName()]
+ if !ok || familyIdx >= len(srcTagFamilies) {
+ return nil
+ }
+ tagMap := specTagMaps[schemaFamily.GetName()]
+ if tagMap == nil {
+ return nil
+ }
+ tagIdx, ok := tagMap[tagName]
+ if !ok || tagIdx >=
len(srcTagFamilies[familyIdx].GetTags()) {
+ return nil
+ }
+ return srcTagFamilies[familyIdx].GetTags()[tagIdx]
+ }
+ }
+ return nil
+}
+
func (ms *measureService) sendReply(metadata *commonv1.Metadata, status
modelv1.Status, messageID uint64, measure measurev1.MeasureService_WriteServer)
{
+ if metadata == nil {
+ ms.l.Error().Stringer("status", status).Msg("metadata is nil,
cannot send reply")
+ return
+ }
if status != modelv1.Status_STATUS_SUCCEED {
ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group,
"measure", "write")
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 4b31874b..81cd420e 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -100,7 +100,7 @@ func (s *streamService) navigateWithRetry(writeEntity
*streamv1.WriteRequest) (t
retryInterval := 10 * time.Millisecond
startTime := time.Now()
for {
- tagValues, shardID, err =
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
+ tagValues, shardID, err =
s.navigateByLocator(writeEntity.GetMetadata(),
writeEntity.GetElement().GetTagFamilies())
if err == nil || !errors.Is(err, errNotExist) ||
time.Since(startTime) > s.maxWaitDuration {
return
}
@@ -111,7 +111,7 @@ func (s *streamService) navigateWithRetry(writeEntity
*streamv1.WriteRequest) (t
}
}
}
- return s.navigate(writeEntity.GetMetadata(),
writeEntity.GetElement().GetTagFamilies())
+ return s.navigateByLocator(writeEntity.GetMetadata(),
writeEntity.GetElement().GetTagFamilies())
}
func (s *streamService) publishMessages(
diff --git a/banyand/measure/write_benchmark_test.go
b/banyand/measure/write_benchmark_test.go
index 12aa38a7..23180b3b 100644
--- a/banyand/measure/write_benchmark_test.go
+++ b/banyand/measure/write_benchmark_test.go
@@ -185,7 +185,7 @@ func BenchmarkAppendDataPoints(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Call appendDataPoints
- appendDataPoints(dest, ts, sid, schema, req, locator)
+ appendDataPoints(dest, ts, sid, schema, req, locator, nil)
releaseDataPoints(dest.dataPoints)
dest.dataPoints = nil
}
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index e64cad88..867eb840 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/wqueue"
@@ -81,6 +82,8 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message
bus.Message) (resp
return
}
groups := make(map[string]*dataPointsInQueue)
+ var metadata *commonv1.Metadata
+ var spec *measurev1.DataPointSpec
for i := range events {
var writeEvent *measurev1.InternalWriteRequest
switch e := events[i].(type) {
@@ -96,8 +99,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
w.l.Warn().Msg("invalid event data type")
continue
}
+ req := writeEvent.Request
+ if req != nil && req.GetMetadata() != nil {
+ metadata = req.GetMetadata()
+ }
+ if req != nil && req.GetDataPointSpec() != nil {
+ spec = req.GetDataPointSpec()
+ }
var err error
- if groups, err = w.handle(groups, writeEvent); err != nil {
+ if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
w.l.Error().Err(err).Msg("cannot handle write event")
groups = make(map[string]*dataPointsInQueue)
continue
@@ -154,7 +164,9 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
return
}
-func (w *writeQueueCallback) handle(dst map[string]*dataPointsInQueue,
writeEvent *measurev1.InternalWriteRequest) (map[string]*dataPointsInQueue,
error) {
+func (w *writeQueueCallback) handle(dst map[string]*dataPointsInQueue,
+ writeEvent *measurev1.InternalWriteRequest, metadata
*commonv1.Metadata, spec *measurev1.DataPointSpec,
+) (map[string]*dataPointsInQueue, error) {
req := writeEvent.Request
t := req.DataPoint.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
@@ -162,7 +174,7 @@ func (w *writeQueueCallback) handle(dst
map[string]*dataPointsInQueue, writeEven
}
ts := t.UnixNano()
- gn := req.Metadata.Group
+ gn := metadata.Group
queue, err := w.schemaRepo.loadQueue(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn,
err)
@@ -185,16 +197,16 @@ func (w *writeQueueCallback) handle(dst
map[string]*dataPointsInQueue, writeEven
break
}
}
- stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
+ stm, ok := w.schemaRepo.loadMeasure(metadata)
if !ok {
- return nil, fmt.Errorf("cannot find measure definition: %s",
req.GetMetadata())
+ return nil, fmt.Errorf("cannot find measure definition: %s",
metadata)
}
fLen := len(req.DataPoint.GetTagFamilies())
if fLen < 1 {
- return nil, fmt.Errorf("%s has no tag family", req.Metadata)
+ return nil, fmt.Errorf("%s has no tag family", metadata)
}
if fLen > len(stm.schema.GetTagFamilies()) {
- return nil, fmt.Errorf("%s has more tag families than %s",
req.Metadata, stm.schema)
+ return nil, fmt.Errorf("%s has more tag families than %s",
metadata, stm.schema)
}
is := stm.indexSchema.Load().(indexSchema)
if len(is.indexRuleLocators.TagFamilyTRule) !=
len(stm.GetSchema().GetTagFamilies()) {
@@ -213,7 +225,7 @@ func (w *writeQueueCallback) handle(dst
map[string]*dataPointsInQueue, writeEven
dpg.tables = append(dpg.tables, dpt)
}
- sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts)
+ sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts,
metadata, spec)
if err != nil {
return nil, err
}
diff --git a/banyand/measure/write_standalone.go
b/banyand/measure/write_standalone.go
index dc7fd8b1..3106c26a 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -25,6 +25,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/common"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -71,10 +72,10 @@ func (w *writeCallback) CheckHealth() *common.Error {
}
func processDataPoint(dpt *dataPointsInTable, req *measurev1.WriteRequest,
writeEvent *measurev1.InternalWriteRequest,
- stm *measure, is indexSchema, ts int64,
+ stm *measure, is indexSchema, ts int64, metadata *commonv1.Metadata,
spec *measurev1.DataPointSpec,
) (uint64, error) {
series := &pbv1.Series{
- Subject: req.Metadata.Name,
+ Subject: metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
@@ -82,7 +83,7 @@ func processDataPoint(dpt *dataPointsInTable, req
*measurev1.WriteRequest, write
}
if stm.schema.IndexMode {
- fields := handleIndexMode(stm.schema, req, is.indexRuleLocators)
+ fields := handleIndexMode(stm.schema, req,
is.indexRuleLocators, spec)
fields = appendEntityTagsToIndexFields(fields, stm, series)
doc := index.Document{
DocID: uint64(series.ID),
@@ -101,7 +102,7 @@ func processDataPoint(dpt *dataPointsInTable, req
*measurev1.WriteRequest, write
return uint64(series.ID), nil
}
- fields := appendDataPoints(dpt, ts, series.ID, stm.GetSchema(), req,
is.indexRuleLocators)
+ fields := appendDataPoints(dpt, ts, series.ID, stm.GetSchema(), req,
is.indexRuleLocators, spec)
doc := index.Document{
DocID: uint64(series.ID),
@@ -118,7 +119,9 @@ func processDataPoint(dpt *dataPointsInTable, req
*measurev1.WriteRequest, write
return uint64(series.ID), nil
}
-func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent
*measurev1.InternalWriteRequest) (map[string]*dataPointsInGroup, error) {
+func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent
*measurev1.InternalWriteRequest,
+ metadata *commonv1.Metadata, spec *measurev1.DataPointSpec,
+) (map[string]*dataPointsInGroup, error) {
req := writeEvent.Request
t := req.DataPoint.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
@@ -126,7 +129,7 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
}
ts := t.UnixNano()
- gn := req.Metadata.Group
+ gn := metadata.Group
tsdb, err := w.schemaRepo.loadTSDB(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn,
err)
@@ -151,16 +154,16 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
break
}
}
- stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
+ stm, ok := w.schemaRepo.loadMeasure(metadata)
if !ok {
- return nil, fmt.Errorf("cannot find measure definition: %s",
req.GetMetadata())
+ return nil, fmt.Errorf("cannot find measure definition: %s",
metadata)
}
fLen := len(req.DataPoint.GetTagFamilies())
if fLen < 1 {
- return nil, fmt.Errorf("%s has no tag family", req.Metadata)
+ return nil, fmt.Errorf("%s has no tag family", metadata)
}
if fLen > len(stm.schema.GetTagFamilies()) {
- return nil, fmt.Errorf("%s has more tag families than %s",
req.Metadata, stm.schema)
+ return nil, fmt.Errorf("%s has more tag families than %s",
metadata, stm.schema)
}
is := stm.indexSchema.Load().(indexSchema)
if len(is.indexRuleLocators.TagFamilyTRule) !=
len(stm.GetSchema().GetTagFamilies()) {
@@ -192,7 +195,7 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
dpg.tables = append(dpg.tables, dpt)
}
- sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts)
+ sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts,
metadata, spec)
if err != nil {
return nil, err
}
@@ -201,9 +204,9 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
}
func appendDataPoints(dest *dataPointsInTable, ts int64, sid common.SeriesID,
schema *databasev1.Measure,
- req *measurev1.WriteRequest, locator partition.IndexRuleLocator,
+ req *measurev1.WriteRequest, locator partition.IndexRuleLocator, spec
*measurev1.DataPointSpec,
) []index.Field {
- tagFamily, fields := handleTagFamily(schema, req, locator)
+ tagFamily, fields := handleTagFamily(schema, req, locator, spec)
if dest.dataPoints == nil {
dest.dataPoints = generateDataPoints()
dest.dataPoints.reset()
@@ -214,17 +217,34 @@ func appendDataPoints(dest *dataPointsInTable, ts int64,
sid common.SeriesID, sc
dataPoints.versions = append(dataPoints.versions, req.DataPoint.Version)
dataPoints.seriesIDs = append(dataPoints.seriesIDs, sid)
+ var specFieldMap map[string]int
+ if spec != nil {
+ specFieldMap = make(map[string]int)
+ for i, fieldName := range spec.GetFieldNames() {
+ specFieldMap[fieldName] = i
+ }
+ }
+
field := nameValues{}
for i := range schema.GetFields() {
+ schemaField := schema.GetFields()[i]
var v *modelv1.FieldValue
- if len(req.DataPoint.Fields) <= i {
- v = pbv1.NullFieldValue
+ if spec != nil {
+ if specIdx, ok := specFieldMap[schemaField.GetName()];
ok && specIdx < len(req.DataPoint.Fields) {
+ v = req.DataPoint.Fields[specIdx]
+ } else {
+ v = pbv1.NullFieldValue
+ }
} else {
- v = req.DataPoint.Fields[i]
+ if len(req.DataPoint.Fields) <= i {
+ v = pbv1.NullFieldValue
+ } else {
+ v = req.DataPoint.Fields[i]
+ }
}
field.values = append(field.values, encodeFieldValue(
- schema.GetFields()[i].GetName(),
- schema.GetFields()[i].FieldType,
+ schemaField.GetName(),
+ schemaField.FieldType,
v,
))
}
@@ -250,35 +270,43 @@ func newDpt(segment storage.Segment[*tsTable, option],
timeRange timestamp.TimeR
return dpt
}
-func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest,
locator partition.IndexRuleLocator) ([]nameValues, []index.Field) {
+func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest,
+ locator partition.IndexRuleLocator, spec *measurev1.DataPointSpec,
+) ([]nameValues, []index.Field) {
tagFamilies := make([]nameValues, 0, len(schema.TagFamilies))
+ specFamilyMap, specTagMaps := buildSpecMaps(spec)
var fields []index.Field
for i := range schema.GetTagFamilies() {
- var tagFamily *modelv1.TagFamilyForWrite
- if len(req.DataPoint.TagFamilies) <= i {
- tagFamily = pbv1.NullTagFamily
- } else {
- tagFamily = req.DataPoint.TagFamilies[i]
- }
- tfr := locator.TagFamilyTRule[i]
tagFamilySpec := schema.GetTagFamilies()[i]
+ tfr := locator.TagFamilyTRule[i]
+ srcFamily, specTagMap := getSourceFamilyAndTagMap(spec,
tagFamilySpec, req, i, specFamilyMap, specTagMaps)
+
tf := nameValues{
name: tagFamilySpec.Name,
}
for j := range tagFamilySpec.Tags {
+ t := tagFamilySpec.Tags[j]
+
var tagValue *modelv1.TagValue
- if tagFamily == pbv1.NullTagFamily ||
len(tagFamily.Tags) <= j {
- tagValue = pbv1.NullTagValue
+ if spec != nil {
+ if srcFamily != nil && specTagMap != nil {
+ if srcTagIdx, ok := specTagMap[t.Name];
ok && srcTagIdx < len(srcFamily.Tags) {
+ tagValue =
srcFamily.Tags[srcTagIdx]
+ }
+ }
+ if tagValue == nil {
+ tagValue = pbv1.NullTagValue
+ }
} else {
- tagValue = tagFamily.Tags[j]
+ if srcFamily == nil || len(srcFamily.Tags) <= j
{
+ tagValue = pbv1.NullTagValue
+ } else {
+ tagValue = srcFamily.Tags[j]
+ }
}
- t := tagFamilySpec.Tags[j]
- encodeTagValue := encodeTagValue(
- t.Name,
- t.Type,
- tagValue)
+ encodeTagValue := encodeTagValue(t.Name, t.Type,
tagValue)
r, ok := tfr[t.Name]
if ok {
fieldKey := index.FieldKey{}
@@ -316,30 +344,39 @@ func handleTagFamily(schema *databasev1.Measure, req
*measurev1.WriteRequest, lo
return tagFamilies, fields
}
-func handleIndexMode(schema *databasev1.Measure, req *measurev1.WriteRequest,
locator partition.IndexRuleLocator) []index.Field {
+func handleIndexMode(schema *databasev1.Measure, req *measurev1.WriteRequest,
+ locator partition.IndexRuleLocator, spec *measurev1.DataPointSpec,
+) []index.Field {
+ specFamilyMap, specTagMaps := buildSpecMaps(spec)
+
var fields []index.Field
for i := range schema.GetTagFamilies() {
- var tagFamily *modelv1.TagFamilyForWrite
- if len(req.DataPoint.TagFamilies) <= i {
- tagFamily = pbv1.NullTagFamily
- } else {
- tagFamily = req.DataPoint.TagFamilies[i]
- }
- tfr := locator.TagFamilyTRule[i]
tagFamilySpec := schema.GetTagFamilies()[i]
+ tfr := locator.TagFamilyTRule[i]
+ srcFamily, specTagMap := getSourceFamilyAndTagMap(spec,
tagFamilySpec, req, i, specFamilyMap, specTagMaps)
+
for j := range tagFamilySpec.Tags {
+ t := tagFamilySpec.Tags[j]
+
var tagValue *modelv1.TagValue
- if tagFamily == pbv1.NullTagFamily ||
len(tagFamily.Tags) <= j {
- tagValue = pbv1.NullTagValue
+ if spec != nil {
+ if srcFamily != nil && specTagMap != nil {
+ if srcTagIdx, ok := specTagMap[t.Name];
ok && srcTagIdx < len(srcFamily.Tags) {
+ tagValue =
srcFamily.Tags[srcTagIdx]
+ }
+ }
+ if tagValue == nil {
+ tagValue = pbv1.NullTagValue
+ }
} else {
- tagValue = tagFamily.Tags[j]
+ if srcFamily == nil || len(srcFamily.Tags) <= j
{
+ tagValue = pbv1.NullTagValue
+ } else {
+ tagValue = srcFamily.Tags[j]
+ }
}
- t := tagFamilySpec.Tags[j]
- encodeTagValue := encodeTagValue(
- t.Name,
- t.Type,
- tagValue)
+ encodeTagValue := encodeTagValue(t.Name, t.Type,
tagValue)
r, toIndex := tfr[t.Name]
fieldKey := index.FieldKey{}
if toIndex {
@@ -415,6 +452,8 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
groups := make(map[string]*dataPointsInGroup)
+ var metadata *commonv1.Metadata
+ var spec *measurev1.DataPointSpec
for i := range events {
var writeEvent *measurev1.InternalWriteRequest
switch e := events[i].(type) {
@@ -430,8 +469,15 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
w.l.Warn().Msg("invalid event data type")
continue
}
+ req := writeEvent.Request
+ if req != nil && req.GetMetadata() != nil {
+ metadata = req.GetMetadata()
+ }
+ if req != nil && req.GetDataPointSpec() != nil {
+ spec = req.GetDataPointSpec()
+ }
var err error
- if groups, err = w.handle(groups, writeEvent); err != nil {
+ if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
w.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEvent)).Msg("cannot handle write event")
groups = make(map[string]*dataPointsInGroup)
continue
@@ -495,6 +541,41 @@ func encodeFieldValue(name string, fieldType
databasev1.FieldType, fieldValue *m
return nv
}
+func buildSpecMaps(spec *measurev1.DataPointSpec) (map[string]int,
map[string]map[string]int) {
+ if spec == nil {
+ return nil, nil
+ }
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec.GetTagFamilySpec() {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
+ for j, tagName := range specFamily.GetTagNames() {
+ tagMap[tagName] = j
+ }
+ specTagMaps[specFamily.GetName()] = tagMap
+ }
+ return specFamilyMap, specTagMaps
+}
+
+func getSourceFamilyAndTagMap(spec *measurev1.DataPointSpec, tagFamilySpec
*databasev1.TagFamilySpec,
+ req *measurev1.WriteRequest, i int, specFamilyMap map[string]int,
+ specTagMaps map[string]map[string]int,
+) (*modelv1.TagFamilyForWrite, map[string]int) {
+ var srcFamily *modelv1.TagFamilyForWrite
+ var specTagMap map[string]int
+ if spec != nil {
+ specIdx, ok := specFamilyMap[tagFamilySpec.Name]
+ if ok && specIdx < len(req.DataPoint.TagFamilies) {
+ srcFamily = req.DataPoint.TagFamilies[specIdx]
+ }
+ specTagMap = specTagMaps[tagFamilySpec.Name]
+ } else if len(req.DataPoint.TagFamilies) > i {
+ srcFamily = req.DataPoint.TagFamilies[i]
+ }
+ return srcFamily, specTagMap
+}
+
func encodeTagValue(name string, tagType databasev1.TagType, tagValue
*modelv1.TagValue) *nameValue {
nv := generateNameValue()
nv.name = name
diff --git a/docs/api-reference.md b/docs/api-reference.md
index b4bc6402..88c17841 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -1783,6 +1783,7 @@ Status is the response status for write
| STATUS_DISK_FULL | 6 | |
| STATUS_VERSION_UNSUPPORTED | 7 | Client version not supported |
| STATUS_VERSION_DEPRECATED | 8 | Client version deprecated but still
supported |
+| STATUS_METADATA_REQUIRED | 9 | Metadata is required for the first request |
diff --git a/test/cases/init.go b/test/cases/init.go
index d63c6628..dc66b595 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -25,6 +25,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
caseproperty
"github.com/apache/skywalking-banyandb/test/cases/property/data"
@@ -61,6 +62,43 @@ func Initialize(addr string, now time.Time) {
casesmeasuredata.Write(conn, "endpoint_traffic", "sw_metric",
"endpoint_traffic.json", now, interval)
casesmeasuredata.Write(conn, "duplicated", "exception",
"duplicated.json", now, 0)
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated",
"service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval)
+ casesmeasuredata.WriteWithSpec(conn, "service_cpm_minute", "sw_metric",
now.Add(20*time.Minute), interval,
+ casesmeasuredata.SpecWithData{
+ Spec: &measurev1.DataPointSpec{
+ TagFamilySpec: []*measurev1.TagFamilySpec{
+ {
+ Name: "default",
+ TagNames: []string{"entity_id",
"id"},
+ },
+ },
+ FieldNames: []string{"value", "total"},
+ },
+ DataFile: "service_cpm_minute_spec_order.json",
+ },
+ casesmeasuredata.SpecWithData{
+ Spec: &measurev1.DataPointSpec{
+ TagFamilySpec: []*measurev1.TagFamilySpec{
+ {
+ Name: "default",
+ TagNames: []string{"id",
"entity_id"},
+ },
+ },
+ FieldNames: []string{"total", "value"},
+ },
+ DataFile: "service_cpm_minute_spec_order2.json",
+ })
+ casesmeasuredata.WriteMixed(conn, "service_cpm_minute", "sw_metric",
+ "service_cpm_minute_schema_order.json",
"service_cpm_minute_spec_order.json",
+ now.Add(30*time.Minute), interval, 2*time.Minute,
+ &measurev1.DataPointSpec{
+ TagFamilySpec: []*measurev1.TagFamilySpec{
+ {
+ Name: "default",
+ TagNames: []string{"entity_id", "id"},
+ },
+ },
+ FieldNames: []string{"value", "total"},
+ })
time.Sleep(5 * time.Second)
// trace
interval = 500 * time.Millisecond
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 10164d65..49f666b2 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -250,6 +250,35 @@ func loadData(md *commonv1.Metadata, measure
measurev1.MeasureService_WriteClien
}
}
+func loadDataWithSpec(md *commonv1.Metadata, measure
measurev1.MeasureService_WriteClient,
+ dataFile string, baseTime time.Time, interval time.Duration, spec
*measurev1.DataPointSpec,
+) {
+ var templates []interface{}
+ content, err := dataFS.ReadFile("testdata/" + dataFile)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+ gm.Expect(json.Unmarshal(content,
&templates)).ShouldNot(gm.HaveOccurred())
+
+ isFirst := true
+ for i, template := range templates {
+ rawDataPointValue, errMarshal := json.Marshal(template)
+ gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
+ dataPointValue := &measurev1.DataPointValue{}
+ gm.Expect(protojson.Unmarshal(rawDataPointValue,
dataPointValue)).ShouldNot(gm.HaveOccurred())
+ dataPointValue.Timestamp =
timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval))
+
+ req := &measurev1.WriteRequest{
+ DataPoint: dataPointValue,
+ MessageId: uint64(time.Now().UnixNano()),
+ }
+ if isFirst {
+ req.Metadata = md
+ req.DataPointSpec = spec
+ isFirst = false
+ }
+ gm.Expect(measure.Send(req)).Should(gm.Succeed())
+ }
+}
+
// Write data into the server.
func Write(conn *grpclib.ClientConn, name, group, dataFile string,
baseTime time.Time, interval time.Duration,
@@ -306,3 +335,98 @@ func WriteOnly(conn *grpclib.ClientConn, name, group,
dataFile string,
loadData(metadata, writeClient, dataFile, baseTime, interval)
return writeClient
}
+
+// SpecWithData pairs a DataPointSpec with a data file.
+type SpecWithData struct {
+ Spec *measurev1.DataPointSpec
+ DataFile string
+}
+
+// WriteWithSpec writes data using multiple data_point_specs to specify tag
and field names.
+func WriteWithSpec(conn *grpclib.ClientConn, name, group string,
+ baseTime time.Time, interval time.Duration, specDataPairs
...SpecWithData,
+) {
+ ctx := context.Background()
+ md := &commonv1.Metadata{
+ Name: name,
+ Group: group,
+ }
+
+ schemaClient := databasev1.NewMeasureRegistryServiceClient(conn)
+ resp, err := schemaClient.Get(ctx,
&databasev1.MeasureRegistryServiceGetRequest{Metadata: md})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ md = resp.GetMeasure().GetMetadata()
+
+ c := measurev1.NewMeasureServiceClient(conn)
+ writeClient, err := c.Write(ctx)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ isFirstRequest := true
+ currentTime := baseTime
+ for _, pair := range specDataPairs {
+ var templates []interface{}
+ content, err := dataFS.ReadFile("testdata/" + pair.DataFile)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+ gm.Expect(json.Unmarshal(content,
&templates)).ShouldNot(gm.HaveOccurred())
+
+ isFirstForSpec := true
+ for i, template := range templates {
+ rawDataPointValue, errMarshal := json.Marshal(template)
+ gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
+ dataPointValue := &measurev1.DataPointValue{}
+ gm.Expect(protojson.Unmarshal(rawDataPointValue,
dataPointValue)).ShouldNot(gm.HaveOccurred())
+ dataPointValue.Timestamp =
timestamppb.New(currentTime.Add(time.Duration(i) * interval))
+ req := &measurev1.WriteRequest{
+ DataPoint: dataPointValue,
+ MessageId: uint64(time.Now().UnixNano()),
+ }
+ if isFirstRequest {
+ req.Metadata = md
+ isFirstRequest = false
+ }
+ if isFirstForSpec {
+ req.DataPointSpec = pair.Spec
+ isFirstForSpec = false
+ }
+ gm.Expect(writeClient.Send(req)).Should(gm.Succeed())
+ }
+ currentTime = currentTime.Add(time.Duration(len(templates)) *
interval)
+ }
+
+ gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+ gm.Eventually(func() error {
+ _, err := writeClient.Recv()
+ return err
+ }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
+
+// WriteMixed writes data in mixed mode: first following the schema order and
then switching to spec mode.
+func WriteMixed(conn *grpclib.ClientConn, name, group string,
+ schemaDataFile, specDataFile string,
+ baseTime time.Time, interval time.Duration,
+ specStartOffset time.Duration, spec *measurev1.DataPointSpec,
+) {
+ ctx := context.Background()
+ metadata := &commonv1.Metadata{
+ Name: name,
+ Group: group,
+ }
+
+ schemaClient := databasev1.NewMeasureRegistryServiceClient(conn)
+ resp, err := schemaClient.Get(ctx,
&databasev1.MeasureRegistryServiceGetRequest{Metadata: metadata})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ metadata = resp.GetMeasure().GetMetadata()
+
+ c := measurev1.NewMeasureServiceClient(conn)
+ writeClient, err := c.Write(ctx)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ loadData(metadata, writeClient, schemaDataFile, baseTime, interval)
+ loadDataWithSpec(nil, writeClient, specDataFile,
baseTime.Add(specStartOffset), interval, spec)
+
+ gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+ gm.Eventually(func() error {
+ _, err := writeClient.Recv()
+ return err
+ }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
diff --git a/test/cases/measure/data/input/write_mixed.ql
b/test/cases/measure/data/input/write_mixed.ql
new file mode 100644
index 00000000..0382cb8b
--- /dev/null
+++ b/test/cases/measure/data/input/write_mixed.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, entity_id, total, value FROM MEASURE service_cpm_minute IN sw_metric
+TIME > '-15m'
+WHERE id IN ('id_schema_1', 'id_schema_2', 'id_schema_3', 'id_spec_1',
'id_spec_2', 'id_spec_3')
+
diff --git a/test/cases/measure/data/input/write_mixed.yaml
b/test/cases/measure/data/input/write_mixed.yaml
new file mode 100644
index 00000000..aefba1bb
--- /dev/null
+++ b/test/cases/measure/data/input/write_mixed.yaml
@@ -0,0 +1,41 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id", "entity_id"]
+fieldProjection:
+ names: ["total", "value"]
+criteria:
+ condition:
+ name: "id"
+ op: "BINARY_OP_IN"
+ value:
+ strArray:
+ value:
+ [
+ "id_schema_1",
+ "id_schema_2",
+ "id_schema_3",
+ "id_spec_1",
+ "id_spec_2",
+ "id_spec_3",
+ ]
+
diff --git a/test/cases/measure/data/input/write_spec.ql
b/test/cases/measure/data/input/write_spec.ql
new file mode 100644
index 00000000..d3111de7
--- /dev/null
+++ b/test/cases/measure/data/input/write_spec.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, entity_id, total, value FROM MEASURE service_cpm_minute IN sw_metric
+TIME > '-15m'
+WHERE id IN ('id_spec_2', 'id_spec_5')
+
diff --git a/test/cases/measure/data/input/write_spec.yaml
b/test/cases/measure/data/input/write_spec.yaml
new file mode 100644
index 00000000..afd48e2b
--- /dev/null
+++ b/test/cases/measure/data/input/write_spec.yaml
@@ -0,0 +1,33 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id", "entity_id"]
+fieldProjection:
+ names: ["total", "value"]
+criteria:
+ condition:
+ name: "id"
+ op: "BINARY_OP_IN"
+ value:
+ strArray:
+ value: ["id_spec_2", "id_spec_5"]
+
diff --git
a/test/cases/measure/data/testdata/service_cpm_minute_schema_order.json
b/test/cases/measure/data/testdata/service_cpm_minute_schema_order.json
new file mode 100644
index 00000000..10c5abed
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_cpm_minute_schema_order.json
@@ -0,0 +1,93 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "id_schema_1"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_schema_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 10
+ }
+ },
+ {
+ "int": {
+ "value": 100
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "id_schema_2"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_schema_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 20
+ }
+ },
+ {
+ "int": {
+ "value": 200
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "id_schema_3"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_schema_3"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 30
+ }
+ },
+ {
+ "int": {
+ "value": 300
+ }
+ }
+ ]
+ }
+]
+
diff --git
a/test/cases/measure/data/testdata/service_cpm_minute_spec_order.json
b/test/cases/measure/data/testdata/service_cpm_minute_spec_order.json
new file mode 100644
index 00000000..f4306dee
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_cpm_minute_spec_order.json
@@ -0,0 +1,93 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_spec_1"
+ }
+ },
+ {
+ "str": {
+ "value": "id_spec_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 10
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_spec_2"
+ }
+ },
+ {
+ "str": {
+ "value": "id_spec_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 200
+ }
+ },
+ {
+ "int": {
+ "value": 20
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_spec_3"
+ }
+ },
+ {
+ "str": {
+ "value": "id_spec_3"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 300
+ }
+ },
+ {
+ "int": {
+ "value": 30
+ }
+ }
+ ]
+ }
+]
+
diff --git
a/test/cases/measure/data/testdata/service_cpm_minute_spec_order2.json
b/test/cases/measure/data/testdata/service_cpm_minute_spec_order2.json
new file mode 100644
index 00000000..fc0abd0f
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_cpm_minute_spec_order2.json
@@ -0,0 +1,93 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "id_spec_4"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_spec_4"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 40
+ }
+ },
+ {
+ "int": {
+ "value": 400
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "id_spec_5"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_spec_5"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 50
+ }
+ },
+ {
+ "int": {
+ "value": 500
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "id_spec_6"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_spec_6"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 60
+ }
+ },
+ {
+ "int": {
+ "value": 600
+ }
+ }
+ ]
+ }
+]
+
diff --git a/test/cases/measure/data/want/write_mixed.yaml
b/test/cases/measure/data/want/write_mixed.yaml
new file mode 100644
index 00000000..b8f734fa
--- /dev/null
+++ b/test/cases/measure/data/want/write_mixed.yaml
@@ -0,0 +1,139 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_spec_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_spec_2
+ fields:
+ - name: total
+ value:
+ int:
+ value: "20"
+ - name: value
+ value:
+ int:
+ value: "200"
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_schema_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_schema_1
+ fields:
+ - name: total
+ value:
+ int:
+ value: "10"
+ - name: value
+ value:
+ int:
+ value: "100"
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_schema_3
+ - key: entity_id
+ value:
+ str:
+ value: entity_schema_3
+ fields:
+ - name: total
+ value:
+ int:
+ value: "30"
+ - name: value
+ value:
+ int:
+ value: "300"
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_spec_3
+ - key: entity_id
+ value:
+ str:
+ value: entity_spec_3
+ fields:
+ - name: total
+ value:
+ int:
+ value: "30"
+ - name: value
+ value:
+ int:
+ value: "300"
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_schema_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_schema_2
+ fields:
+ - name: total
+ value:
+ int:
+ value: "20"
+ - name: value
+ value:
+ int:
+ value: "200"
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_spec_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_spec_1
+ fields:
+ - name: total
+ value:
+ int:
+ value: "10"
+ - name: value
+ value:
+ int:
+ value: "100"
+
diff --git a/test/cases/measure/data/want/write_spec.yaml
b/test/cases/measure/data/want/write_spec.yaml
new file mode 100644
index 00000000..974802c4
--- /dev/null
+++ b/test/cases/measure/data/want/write_spec.yaml
@@ -0,0 +1,59 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_spec_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_spec_2
+ fields:
+ - name: total
+ value:
+ int:
+ value: "20"
+ - name: value
+ value:
+ int:
+ value: "200"
+ - tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: id_spec_5
+ - key: entity_id
+ value:
+ str:
+ value: entity_spec_5
+ fields:
+ - name: total
+ value:
+ int:
+ value: "50"
+ - name: value
+ value:
+ int:
+ value: "500"
+
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 5e692974..24469ef9 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -87,4 +87,6 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("query by id in index mode", helpers.Args{Input:
"index_mode_by_id", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("multi groups: unchanged", helpers.Args{Input:
"multi_group_unchanged", Duration: 35 * time.Minute, Offset: -20 *
time.Minute}),
g.Entry("multi groups: new tag and fields", helpers.Args{Input:
"multi_group_new_tag_field", Duration: 35 * time.Minute, Offset: -20 *
time.Minute}),
+ g.Entry("write spec", helpers.Args{Input: "write_spec", Duration: 15 *
time.Minute, Offset: 15 * time.Minute, DisOrder: true}),
+ g.Entry("write mixed", helpers.Args{Input: "write_mixed", Duration: 15
* time.Minute, Offset: 25 * time.Minute, DisOrder: true}),
)