This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 671cdb8f Implement spec-based bulk write for measure (#869)
671cdb8f is described below

commit 671cdb8f226690390d55e4298586d7a5caed5337
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Dec 5 08:40:57 2025 +0800

    Implement spec-based bulk write for measure (#869)
    
    * Implement spec-based bulk write for measure
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 api/proto/banyandb/model/v1/write.proto            |   1 +
 banyand/liaison/grpc/discovery.go                  |   9 +-
 banyand/liaison/grpc/measure.go                    | 214 +++++++++++++++++++--
 banyand/liaison/grpc/stream.go                     |   4 +-
 banyand/measure/write_benchmark_test.go            |   2 +-
 banyand/measure/write_liaison.go                   |  28 ++-
 banyand/measure/write_standalone.go                | 183 +++++++++++++-----
 docs/api-reference.md                              |   1 +
 test/cases/init.go                                 |  38 ++++
 test/cases/measure/data/data.go                    | 124 ++++++++++++
 test/cases/measure/data/input/write_mixed.ql       |  22 +++
 test/cases/measure/data/input/write_mixed.yaml     |  41 ++++
 test/cases/measure/data/input/write_spec.ql        |  22 +++
 test/cases/measure/data/input/write_spec.yaml      |  33 ++++
 .../testdata/service_cpm_minute_schema_order.json  |  93 +++++++++
 .../testdata/service_cpm_minute_spec_order.json    |  93 +++++++++
 .../testdata/service_cpm_minute_spec_order2.json   |  93 +++++++++
 test/cases/measure/data/want/write_mixed.yaml      | 139 +++++++++++++
 test/cases/measure/data/want/write_spec.yaml       |  59 ++++++
 test/cases/measure/measure.go                      |   2 +
 20 files changed, 1117 insertions(+), 84 deletions(-)

diff --git a/api/proto/banyandb/model/v1/write.proto 
b/api/proto/banyandb/model/v1/write.proto
index 3f25e1ee..3c7a3504 100644
--- a/api/proto/banyandb/model/v1/write.proto
+++ b/api/proto/banyandb/model/v1/write.proto
@@ -33,4 +33,5 @@ enum Status {
   STATUS_DISK_FULL = 6;
   STATUS_VERSION_UNSUPPORTED = 7; // Client version not supported
   STATUS_VERSION_DEPRECATED = 8; // Client version deprecated but still 
supported
+  STATUS_METADATA_REQUIRED = 9; // Metadata is required for the first request
 }
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index acb101bd..abeb9f99 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -83,7 +83,7 @@ func (ds *discoveryService) SetLogger(log *logger.Logger) {
        ds.shardingKeyRepo.log = log
 }
 
-func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies 
[]*modelv1.TagFamilyForWrite) (pbv1.EntityValues, common.ShardID, error) {
+func (ds *discoveryService) navigateByLocator(metadata *commonv1.Metadata, 
tagFamilies []*modelv1.TagFamilyForWrite) (pbv1.EntityValues, common.ShardID, 
error) {
        shardNum, existed := ds.groupRepo.shardNum(metadata.Group)
        if !existed {
                return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
@@ -335,6 +335,13 @@ func (e *entityRepo) getTraceIDIndex(id identity) (int, 
bool) {
        return index, true
 }
 
+func (e *entityRepo) getMeasure(id identity) (*databasev1.Measure, bool) {
+       e.RWMutex.RLock()
+       defer e.RWMutex.RUnlock()
+       m, ok := e.measureMap[id]
+       return m, ok
+}
+
 var _ schema.EventHandler = (*shardingKeyRepo)(nil)
 
 type shardingKeyRepo struct {
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 355968d0..001b6d02 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -36,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -83,6 +85,12 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
 
        defer ms.handleWriteCleanup(publisher, &succeedSent, measure, start)
 
+       var metadata *commonv1.Metadata
+       var spec *measurev1.DataPointSpec
+       isFirstRequest := true
+       nodeMetadataSent := make(map[string]bool)
+       nodeSpecSent := make(map[string]bool)
+
        for {
                select {
                case <-ctx.Done():
@@ -101,35 +109,52 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        return err
                }
 
-               ms.metrics.totalStreamMsgReceived.Inc(1, 
writeRequest.Metadata.Group, "measure", "write")
+               if writeRequest.GetMetadata() != nil {
+                       metadata = writeRequest.GetMetadata()
+                       nodeMetadataSent = make(map[string]bool)
+               } else if isFirstRequest {
+                       ms.l.Error().Msg("metadata is required for the first 
request of gRPC stream")
+                       ms.sendReply(nil, 
modelv1.Status_STATUS_METADATA_REQUIRED, writeRequest.GetMessageId(), measure)
+                       return errors.New("metadata is required for the first 
request of gRPC stream")
+               }
+               isFirstRequest = false
+
+               if writeRequest.GetDataPointSpec() != nil {
+                       spec = writeRequest.GetDataPointSpec()
+                       nodeSpecSent = make(map[string]bool)
+               }
+
+               ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"measure", "write")
 
-               if status := ms.validateWriteRequest(writeRequest, measure); 
status != modelv1.Status_STATUS_SUCCEED {
+               if status := ms.validateWriteRequest(writeRequest, metadata, 
measure); status != modelv1.Status_STATUS_SUCCEED {
                        continue
                }
 
-               if err := ms.processAndPublishRequest(ctx, writeRequest, 
publisher, &succeedSent, measure); err != nil {
+               if err := ms.processAndPublishRequest(ctx, writeRequest, 
metadata, spec, publisher, &succeedSent, measure, nodeMetadataSent, 
nodeSpecSent); err != nil {
                        continue
                }
        }
 }
 
-func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequest, measure measurev1.MeasureService_WriteServer) 
modelv1.Status {
+func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequest,
+       metadata *commonv1.Metadata, measure 
measurev1.MeasureService_WriteServer,
+) modelv1.Status {
        if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); 
errTime != nil {
                ms.l.Error().Err(errTime).Stringer("written", 
writeRequest).Msg("the data point time is invalid")
-               ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure)
+               ms.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeRequest.GetMessageId(), measure)
                return modelv1.Status_STATUS_INVALID_TIMESTAMP
        }
 
-       if writeRequest.Metadata.ModRevision > 0 {
-               measureCache, existed := 
ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
+       if metadata.ModRevision > 0 {
+               measureCache, existed := 
ms.entityRepo.getLocator(getID(metadata))
                if !existed {
                        ms.l.Error().Stringer("written", 
writeRequest).Msg("failed to measure schema not found")
-                       ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure)
+                       ms.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeRequest.GetMessageId(), measure)
                        return modelv1.Status_STATUS_NOT_FOUND
                }
-               if writeRequest.Metadata.ModRevision != 
measureCache.ModRevision {
+               if metadata.ModRevision != measureCache.ModRevision {
                        ms.l.Error().Stringer("written", writeRequest).Msg("the 
measure schema is expired")
-                       ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure)
+                       ms.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure)
                        return modelv1.Status_STATUS_EXPIRED_SCHEMA
                }
        }
@@ -138,7 +163,9 @@ func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequ
 }
 
 func (ms *measureService) processAndPublishRequest(ctx context.Context, 
writeRequest *measurev1.WriteRequest,
-       publisher queue.BatchPublisher, succeedSent *[]succeedSentMessage, 
measure measurev1.MeasureService_WriteServer,
+       metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, publisher 
queue.BatchPublisher,
+       succeedSent *[]succeedSentMessage, measure 
measurev1.MeasureService_WriteServer,
+       nodeMetadataSent map[string]bool, nodeSpecSent map[string]bool,
 ) error {
        // Retry with backoff when encountering errNotExist
        var tagValues pbv1.EntityValues
@@ -149,7 +176,7 @@ func (ms *measureService) processAndPublishRequest(ctx 
context.Context, writeReq
                retryInterval := 10 * time.Millisecond
                startTime := time.Now()
                for {
-                       tagValues, shardID, err = 
ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
+                       tagValues, shardID, err = ms.navigate(metadata, 
writeRequest, spec)
                        if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > ms.maxWaitDuration {
                                break
                        }
@@ -162,12 +189,12 @@ func (ms *measureService) processAndPublishRequest(ctx 
context.Context, writeReq
                        }
                }
        } else {
-               tagValues, shardID, err = 
ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
+               tagValues, shardID, err = ms.navigate(metadata, writeRequest, 
spec)
        }
 
        if err != nil {
                ms.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
-               ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+               ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR, 
writeRequest.GetMessageId(), measure)
                return err
        }
 
@@ -190,45 +217,190 @@ func (ms *measureService) processAndPublishRequest(ctx 
context.Context, writeReq
                EntityValues: tagValues[1:].Encode(),
        }
 
-       nodes, err := ms.publishToNodes(ctx, writeRequest, iwr, publisher, 
uint32(shardID), measure)
+       nodes, err := ms.publishToNodes(ctx, writeRequest, metadata, spec, iwr, 
publisher, uint32(shardID), measure, nodeMetadataSent, nodeSpecSent)
        if err != nil {
                return err
        }
 
        *succeedSent = append(*succeedSent, succeedSentMessage{
-               metadata:  writeRequest.GetMetadata(),
+               metadata:  metadata,
                messageID: writeRequest.GetMessageId(),
                nodes:     nodes,
        })
        return nil
 }
 
-func (ms *measureService) publishToNodes(ctx context.Context, writeRequest 
*measurev1.WriteRequest, iwr *measurev1.InternalWriteRequest,
+func (ms *measureService) publishToNodes(ctx context.Context, writeRequest 
*measurev1.WriteRequest,
+       metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, iwr 
*measurev1.InternalWriteRequest,
        publisher queue.BatchPublisher, shardID uint32, measure 
measurev1.MeasureService_WriteServer,
+       nodeMetadataSent map[string]bool, nodeSpecSent map[string]bool,
 ) ([]string, error) {
-       nodeID, errPickNode := 
ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), 
writeRequest.GetMetadata().GetName(), shardID, 0)
+       nodeID, errPickNode := ms.nodeRegistry.Locate(metadata.GetGroup(), 
metadata.GetName(), shardID, 0)
        if errPickNode != nil {
                ms.l.Error().Err(errPickNode).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to pick an available node")
-               ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+               ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR, 
writeRequest.GetMessageId(), measure)
                return nil, errPickNode
        }
 
+       if !nodeMetadataSent[nodeID] {
+               iwr.Request.Metadata = metadata
+               nodeMetadataSent[nodeID] = true
+       }
+       if spec != nil && !nodeSpecSent[nodeID] {
+               iwr.Request.DataPointSpec = spec
+               nodeSpecSent[nodeID] = true
+       }
+
        message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
        _, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite, 
message)
        if errWritePub != nil {
                ms.l.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a 
message")
                var ce *common.Error
                if errors.As(errWritePub, &ce) {
-                       ms.sendReply(writeRequest.GetMetadata(), ce.Status(), 
writeRequest.GetMessageId(), measure)
+                       ms.sendReply(metadata, ce.Status(), 
writeRequest.GetMessageId(), measure)
                        return nil, errWritePub
                }
-               ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+               ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR, 
writeRequest.GetMessageId(), measure)
                return nil, errWritePub
        }
        return []string{nodeID}, nil
 }
 
+func (ms *measureService) navigate(metadata *commonv1.Metadata,
+       writeRequest *measurev1.WriteRequest, spec *measurev1.DataPointSpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+       tagFamilies := writeRequest.GetDataPoint().GetTagFamilies()
+       if spec == nil {
+               return ms.navigateByLocator(metadata, tagFamilies)
+       }
+       return ms.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (ms *measureService) navigateByTagSpec(
+       metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, tagFamilies 
[]*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+       shardNum, existed := ms.groupRepo.shardNum(metadata.Group)
+       if !existed {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
+       }
+       id := getID(metadata)
+       measure, ok := ms.entityRepo.getMeasure(id)
+       if !ok {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding measure schema by: %v", metadata)
+       }
+       specFamilyMap, specTagMaps := ms.buildSpecMaps(spec)
+
+       entityValues := ms.findTagValuesByNames(
+               metadata.Name,
+               measure.GetTagFamilies(),
+               tagFamilies,
+               measure.GetEntity().GetTagNames(),
+               specFamilyMap,
+               specTagMaps,
+       )
+       entity, err := entityValues.ToEntity()
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+
+       shardingKey := measure.GetShardingKey()
+       if shardingKey != nil && len(shardingKey.GetTagNames()) > 0 {
+               shardingKeyValues := ms.findTagValuesByNames(
+                       metadata.Name,
+                       measure.GetTagFamilies(),
+                       tagFamilies,
+                       shardingKey.GetTagNames(),
+                       specFamilyMap,
+                       specTagMaps,
+               )
+               shardingEntity, shardingErr := shardingKeyValues.ToEntity()
+               if shardingErr != nil {
+                       return nil, common.ShardID(0), shardingErr
+               }
+               shardID, shardingErr := 
partition.ShardID(shardingEntity.Marshal(), shardNum)
+               if shardingErr != nil {
+                       return nil, common.ShardID(0), shardingErr
+               }
+               return entityValues, common.ShardID(shardID), nil
+       }
+
+       shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+       return entityValues, common.ShardID(shardID), nil
+}
+
+func (ms *measureService) buildSpecMaps(spec *measurev1.DataPointSpec) 
(map[string]int, map[string]map[string]int) {
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)
+       for i, specFamily := range spec.GetTagFamilySpec() {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int)
+               for j, tagName := range specFamily.GetTagNames() {
+                       tagMap[tagName] = j
+               }
+               specTagMaps[specFamily.GetName()] = tagMap
+       }
+       return specFamilyMap, specTagMaps
+}
+
+func (ms *measureService) findTagValuesByNames(
+       subject string,
+       schemaFamilies []*databasev1.TagFamilySpec,
+       srcTagFamilies []*modelv1.TagFamilyForWrite,
+       tagNames []string,
+       specFamilyMap map[string]int,
+       specTagMaps map[string]map[string]int,
+) pbv1.EntityValues {
+       entityValues := make(pbv1.EntityValues, len(tagNames)+1)
+       entityValues[0] = pbv1.EntityStrValue(subject)
+       for i, tagName := range tagNames {
+               tagValue := ms.findTagValueByName(schemaFamilies, 
srcTagFamilies, tagName, specFamilyMap, specTagMaps)
+               if tagValue == nil {
+                       entityValues[i+1] = &modelv1.TagValue{Value: 
&modelv1.TagValue_Null{}}
+               } else {
+                       entityValues[i+1] = tagValue
+               }
+       }
+       return entityValues
+}
+
+func (ms *measureService) findTagValueByName(
+       schemaFamilies []*databasev1.TagFamilySpec,
+       srcTagFamilies []*modelv1.TagFamilyForWrite,
+       tagName string,
+       specFamilyMap map[string]int,
+       specTagMaps map[string]map[string]int,
+) *modelv1.TagValue {
+       for _, schemaFamily := range schemaFamilies {
+               for _, schemaTag := range schemaFamily.GetTags() {
+                       if schemaTag.GetName() != tagName {
+                               continue
+                       }
+                       familyIdx, ok := specFamilyMap[schemaFamily.GetName()]
+                       if !ok || familyIdx >= len(srcTagFamilies) {
+                               return nil
+                       }
+                       tagMap := specTagMaps[schemaFamily.GetName()]
+                       if tagMap == nil {
+                               return nil
+                       }
+                       tagIdx, ok := tagMap[tagName]
+                       if !ok || tagIdx >= 
len(srcTagFamilies[familyIdx].GetTags()) {
+                               return nil
+                       }
+                       return srcTagFamilies[familyIdx].GetTags()[tagIdx]
+               }
+       }
+       return nil
+}
+
 func (ms *measureService) sendReply(metadata *commonv1.Metadata, status 
modelv1.Status, messageID uint64, measure measurev1.MeasureService_WriteServer) 
{
+       if metadata == nil {
+               ms.l.Error().Stringer("status", status).Msg("metadata is nil, 
cannot send reply")
+               return
+       }
        if status != modelv1.Status_STATUS_SUCCEED {
                ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, 
"measure", "write")
        }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 4b31874b..81cd420e 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -100,7 +100,7 @@ func (s *streamService) navigateWithRetry(writeEntity 
*streamv1.WriteRequest) (t
                retryInterval := 10 * time.Millisecond
                startTime := time.Now()
                for {
-                       tagValues, shardID, err = 
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
+                       tagValues, shardID, err = 
s.navigateByLocator(writeEntity.GetMetadata(), 
writeEntity.GetElement().GetTagFamilies())
                        if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > s.maxWaitDuration {
                                return
                        }
@@ -111,7 +111,7 @@ func (s *streamService) navigateWithRetry(writeEntity 
*streamv1.WriteRequest) (t
                        }
                }
        }
-       return s.navigate(writeEntity.GetMetadata(), 
writeEntity.GetElement().GetTagFamilies())
+       return s.navigateByLocator(writeEntity.GetMetadata(), 
writeEntity.GetElement().GetTagFamilies())
 }
 
 func (s *streamService) publishMessages(
diff --git a/banyand/measure/write_benchmark_test.go 
b/banyand/measure/write_benchmark_test.go
index 12aa38a7..23180b3b 100644
--- a/banyand/measure/write_benchmark_test.go
+++ b/banyand/measure/write_benchmark_test.go
@@ -185,7 +185,7 @@ func BenchmarkAppendDataPoints(b *testing.B) {
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
                // Call appendDataPoints
-               appendDataPoints(dest, ts, sid, schema, req, locator)
+               appendDataPoints(dest, ts, sid, schema, req, locator, nil)
                releaseDataPoints(dest.dataPoints)
                dest.dataPoints = nil
        }
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index e64cad88..867eb840 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/wqueue"
@@ -81,6 +82,8 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message 
bus.Message) (resp
                return
        }
        groups := make(map[string]*dataPointsInQueue)
+       var metadata *commonv1.Metadata
+       var spec *measurev1.DataPointSpec
        for i := range events {
                var writeEvent *measurev1.InternalWriteRequest
                switch e := events[i].(type) {
@@ -96,8 +99,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                        w.l.Warn().Msg("invalid event data type")
                        continue
                }
+               req := writeEvent.Request
+               if req != nil && req.GetMetadata() != nil {
+                       metadata = req.GetMetadata()
+               }
+               if req != nil && req.GetDataPointSpec() != nil {
+                       spec = req.GetDataPointSpec()
+               }
                var err error
-               if groups, err = w.handle(groups, writeEvent); err != nil {
+               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
                        w.l.Error().Err(err).Msg("cannot handle write event")
                        groups = make(map[string]*dataPointsInQueue)
                        continue
@@ -154,7 +164,9 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
        return
 }
 
-func (w *writeQueueCallback) handle(dst map[string]*dataPointsInQueue, 
writeEvent *measurev1.InternalWriteRequest) (map[string]*dataPointsInQueue, 
error) {
+func (w *writeQueueCallback) handle(dst map[string]*dataPointsInQueue,
+       writeEvent *measurev1.InternalWriteRequest, metadata 
*commonv1.Metadata, spec *measurev1.DataPointSpec,
+) (map[string]*dataPointsInQueue, error) {
        req := writeEvent.Request
        t := req.DataPoint.Timestamp.AsTime().Local()
        if err := timestamp.Check(t); err != nil {
@@ -162,7 +174,7 @@ func (w *writeQueueCallback) handle(dst 
map[string]*dataPointsInQueue, writeEven
        }
        ts := t.UnixNano()
 
-       gn := req.Metadata.Group
+       gn := metadata.Group
        queue, err := w.schemaRepo.loadQueue(gn)
        if err != nil {
                return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, 
err)
@@ -185,16 +197,16 @@ func (w *writeQueueCallback) handle(dst 
map[string]*dataPointsInQueue, writeEven
                        break
                }
        }
-       stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
+       stm, ok := w.schemaRepo.loadMeasure(metadata)
        if !ok {
-               return nil, fmt.Errorf("cannot find measure definition: %s", 
req.GetMetadata())
+               return nil, fmt.Errorf("cannot find measure definition: %s", 
metadata)
        }
        fLen := len(req.DataPoint.GetTagFamilies())
        if fLen < 1 {
-               return nil, fmt.Errorf("%s has no tag family", req.Metadata)
+               return nil, fmt.Errorf("%s has no tag family", metadata)
        }
        if fLen > len(stm.schema.GetTagFamilies()) {
-               return nil, fmt.Errorf("%s has more tag families than %s", 
req.Metadata, stm.schema)
+               return nil, fmt.Errorf("%s has more tag families than %s", 
metadata, stm.schema)
        }
        is := stm.indexSchema.Load().(indexSchema)
        if len(is.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
@@ -213,7 +225,7 @@ func (w *writeQueueCallback) handle(dst 
map[string]*dataPointsInQueue, writeEven
                dpg.tables = append(dpg.tables, dpt)
        }
 
-       sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts)
+       sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts, 
metadata, spec)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/measure/write_standalone.go 
b/banyand/measure/write_standalone.go
index dc7fd8b1..3106c26a 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -25,6 +25,7 @@ import (
        "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -71,10 +72,10 @@ func (w *writeCallback) CheckHealth() *common.Error {
 }
 
 func processDataPoint(dpt *dataPointsInTable, req *measurev1.WriteRequest, 
writeEvent *measurev1.InternalWriteRequest,
-       stm *measure, is indexSchema, ts int64,
+       stm *measure, is indexSchema, ts int64, metadata *commonv1.Metadata, 
spec *measurev1.DataPointSpec,
 ) (uint64, error) {
        series := &pbv1.Series{
-               Subject:      req.Metadata.Name,
+               Subject:      metadata.Name,
                EntityValues: writeEvent.EntityValues,
        }
        if err := series.Marshal(); err != nil {
@@ -82,7 +83,7 @@ func processDataPoint(dpt *dataPointsInTable, req 
*measurev1.WriteRequest, write
        }
 
        if stm.schema.IndexMode {
-               fields := handleIndexMode(stm.schema, req, is.indexRuleLocators)
+               fields := handleIndexMode(stm.schema, req, 
is.indexRuleLocators, spec)
                fields = appendEntityTagsToIndexFields(fields, stm, series)
                doc := index.Document{
                        DocID:        uint64(series.ID),
@@ -101,7 +102,7 @@ func processDataPoint(dpt *dataPointsInTable, req 
*measurev1.WriteRequest, write
                return uint64(series.ID), nil
        }
 
-       fields := appendDataPoints(dpt, ts, series.ID, stm.GetSchema(), req, 
is.indexRuleLocators)
+       fields := appendDataPoints(dpt, ts, series.ID, stm.GetSchema(), req, 
is.indexRuleLocators, spec)
 
        doc := index.Document{
                DocID:        uint64(series.ID),
@@ -118,7 +119,9 @@ func processDataPoint(dpt *dataPointsInTable, req 
*measurev1.WriteRequest, write
        return uint64(series.ID), nil
 }
 
-func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent 
*measurev1.InternalWriteRequest) (map[string]*dataPointsInGroup, error) {
+func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent 
*measurev1.InternalWriteRequest,
+       metadata *commonv1.Metadata, spec *measurev1.DataPointSpec,
+) (map[string]*dataPointsInGroup, error) {
        req := writeEvent.Request
        t := req.DataPoint.Timestamp.AsTime().Local()
        if err := timestamp.Check(t); err != nil {
@@ -126,7 +129,7 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        }
        ts := t.UnixNano()
 
-       gn := req.Metadata.Group
+       gn := metadata.Group
        tsdb, err := w.schemaRepo.loadTSDB(gn)
        if err != nil {
                return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, 
err)
@@ -151,16 +154,16 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
                        break
                }
        }
-       stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
+       stm, ok := w.schemaRepo.loadMeasure(metadata)
        if !ok {
-               return nil, fmt.Errorf("cannot find measure definition: %s", 
req.GetMetadata())
+               return nil, fmt.Errorf("cannot find measure definition: %s", 
metadata)
        }
        fLen := len(req.DataPoint.GetTagFamilies())
        if fLen < 1 {
-               return nil, fmt.Errorf("%s has no tag family", req.Metadata)
+               return nil, fmt.Errorf("%s has no tag family", metadata)
        }
        if fLen > len(stm.schema.GetTagFamilies()) {
-               return nil, fmt.Errorf("%s has more tag families than %s", 
req.Metadata, stm.schema)
+               return nil, fmt.Errorf("%s has more tag families than %s", 
metadata, stm.schema)
        }
        is := stm.indexSchema.Load().(indexSchema)
        if len(is.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
@@ -192,7 +195,7 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
                dpg.tables = append(dpg.tables, dpt)
        }
 
-       sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts)
+       sid, err := processDataPoint(dpt, req, writeEvent, stm, is, ts, 
metadata, spec)
        if err != nil {
                return nil, err
        }
@@ -201,9 +204,9 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
 }
 
 func appendDataPoints(dest *dataPointsInTable, ts int64, sid common.SeriesID, 
schema *databasev1.Measure,
-       req *measurev1.WriteRequest, locator partition.IndexRuleLocator,
+       req *measurev1.WriteRequest, locator partition.IndexRuleLocator, spec 
*measurev1.DataPointSpec,
 ) []index.Field {
-       tagFamily, fields := handleTagFamily(schema, req, locator)
+       tagFamily, fields := handleTagFamily(schema, req, locator, spec)
        if dest.dataPoints == nil {
                dest.dataPoints = generateDataPoints()
                dest.dataPoints.reset()
@@ -214,17 +217,34 @@ func appendDataPoints(dest *dataPointsInTable, ts int64, 
sid common.SeriesID, sc
        dataPoints.versions = append(dataPoints.versions, req.DataPoint.Version)
        dataPoints.seriesIDs = append(dataPoints.seriesIDs, sid)
 
+       var specFieldMap map[string]int
+       if spec != nil {
+               specFieldMap = make(map[string]int)
+               for i, fieldName := range spec.GetFieldNames() {
+                       specFieldMap[fieldName] = i
+               }
+       }
+
        field := nameValues{}
        for i := range schema.GetFields() {
+               schemaField := schema.GetFields()[i]
                var v *modelv1.FieldValue
-               if len(req.DataPoint.Fields) <= i {
-                       v = pbv1.NullFieldValue
+               if spec != nil {
+                       if specIdx, ok := specFieldMap[schemaField.GetName()]; 
ok && specIdx < len(req.DataPoint.Fields) {
+                               v = req.DataPoint.Fields[specIdx]
+                       } else {
+                               v = pbv1.NullFieldValue
+                       }
                } else {
-                       v = req.DataPoint.Fields[i]
+                       if len(req.DataPoint.Fields) <= i {
+                               v = pbv1.NullFieldValue
+                       } else {
+                               v = req.DataPoint.Fields[i]
+                       }
                }
                field.values = append(field.values, encodeFieldValue(
-                       schema.GetFields()[i].GetName(),
-                       schema.GetFields()[i].FieldType,
+                       schemaField.GetName(),
+                       schemaField.FieldType,
                        v,
                ))
        }
@@ -250,35 +270,43 @@ func newDpt(segment storage.Segment[*tsTable, option], 
timeRange timestamp.TimeR
        return dpt
 }
 
-func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest, 
locator partition.IndexRuleLocator) ([]nameValues, []index.Field) {
+func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest,
+       locator partition.IndexRuleLocator, spec *measurev1.DataPointSpec,
+) ([]nameValues, []index.Field) {
        tagFamilies := make([]nameValues, 0, len(schema.TagFamilies))
+       specFamilyMap, specTagMaps := buildSpecMaps(spec)
 
        var fields []index.Field
        for i := range schema.GetTagFamilies() {
-               var tagFamily *modelv1.TagFamilyForWrite
-               if len(req.DataPoint.TagFamilies) <= i {
-                       tagFamily = pbv1.NullTagFamily
-               } else {
-                       tagFamily = req.DataPoint.TagFamilies[i]
-               }
-               tfr := locator.TagFamilyTRule[i]
                tagFamilySpec := schema.GetTagFamilies()[i]
+               tfr := locator.TagFamilyTRule[i]
+               srcFamily, specTagMap := getSourceFamilyAndTagMap(spec, 
tagFamilySpec, req, i, specFamilyMap, specTagMaps)
+
                tf := nameValues{
                        name: tagFamilySpec.Name,
                }
                for j := range tagFamilySpec.Tags {
+                       t := tagFamilySpec.Tags[j]
+
                        var tagValue *modelv1.TagValue
-                       if tagFamily == pbv1.NullTagFamily || 
len(tagFamily.Tags) <= j {
-                               tagValue = pbv1.NullTagValue
+                       if spec != nil {
+                               if srcFamily != nil && specTagMap != nil {
+                                       if srcTagIdx, ok := specTagMap[t.Name]; 
ok && srcTagIdx < len(srcFamily.Tags) {
+                                               tagValue = 
srcFamily.Tags[srcTagIdx]
+                                       }
+                               }
+                               if tagValue == nil {
+                                       tagValue = pbv1.NullTagValue
+                               }
                        } else {
-                               tagValue = tagFamily.Tags[j]
+                               if srcFamily == nil || len(srcFamily.Tags) <= j 
{
+                                       tagValue = pbv1.NullTagValue
+                               } else {
+                                       tagValue = srcFamily.Tags[j]
+                               }
                        }
 
-                       t := tagFamilySpec.Tags[j]
-                       encodeTagValue := encodeTagValue(
-                               t.Name,
-                               t.Type,
-                               tagValue)
+                       encodeTagValue := encodeTagValue(t.Name, t.Type, 
tagValue)
                        r, ok := tfr[t.Name]
                        if ok {
                                fieldKey := index.FieldKey{}
@@ -316,30 +344,39 @@ func handleTagFamily(schema *databasev1.Measure, req 
*measurev1.WriteRequest, lo
        return tagFamilies, fields
 }
 
-func handleIndexMode(schema *databasev1.Measure, req *measurev1.WriteRequest, 
locator partition.IndexRuleLocator) []index.Field {
+func handleIndexMode(schema *databasev1.Measure, req *measurev1.WriteRequest,
+       locator partition.IndexRuleLocator, spec *measurev1.DataPointSpec,
+) []index.Field {
+       specFamilyMap, specTagMaps := buildSpecMaps(spec)
+
        var fields []index.Field
        for i := range schema.GetTagFamilies() {
-               var tagFamily *modelv1.TagFamilyForWrite
-               if len(req.DataPoint.TagFamilies) <= i {
-                       tagFamily = pbv1.NullTagFamily
-               } else {
-                       tagFamily = req.DataPoint.TagFamilies[i]
-               }
-               tfr := locator.TagFamilyTRule[i]
                tagFamilySpec := schema.GetTagFamilies()[i]
+               tfr := locator.TagFamilyTRule[i]
+               srcFamily, specTagMap := getSourceFamilyAndTagMap(spec, 
tagFamilySpec, req, i, specFamilyMap, specTagMaps)
+
                for j := range tagFamilySpec.Tags {
+                       t := tagFamilySpec.Tags[j]
+
                        var tagValue *modelv1.TagValue
-                       if tagFamily == pbv1.NullTagFamily || 
len(tagFamily.Tags) <= j {
-                               tagValue = pbv1.NullTagValue
+                       if spec != nil {
+                               if srcFamily != nil && specTagMap != nil {
+                                       if srcTagIdx, ok := specTagMap[t.Name]; 
ok && srcTagIdx < len(srcFamily.Tags) {
+                                               tagValue = 
srcFamily.Tags[srcTagIdx]
+                                       }
+                               }
+                               if tagValue == nil {
+                                       tagValue = pbv1.NullTagValue
+                               }
                        } else {
-                               tagValue = tagFamily.Tags[j]
+                               if srcFamily == nil || len(srcFamily.Tags) <= j 
{
+                                       tagValue = pbv1.NullTagValue
+                               } else {
+                                       tagValue = srcFamily.Tags[j]
+                               }
                        }
 
-                       t := tagFamilySpec.Tags[j]
-                       encodeTagValue := encodeTagValue(
-                               t.Name,
-                               t.Type,
-                               tagValue)
+                       encodeTagValue := encodeTagValue(t.Name, t.Type, 
tagValue)
                        r, toIndex := tfr[t.Name]
                        fieldKey := index.FieldKey{}
                        if toIndex {
@@ -415,6 +452,8 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                return
        }
        groups := make(map[string]*dataPointsInGroup)
+       var metadata *commonv1.Metadata
+       var spec *measurev1.DataPointSpec
        for i := range events {
                var writeEvent *measurev1.InternalWriteRequest
                switch e := events[i].(type) {
@@ -430,8 +469,15 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                        w.l.Warn().Msg("invalid event data type")
                        continue
                }
+               req := writeEvent.Request
+               if req != nil && req.GetMetadata() != nil {
+                       metadata = req.GetMetadata()
+               }
+               if req != nil && req.GetDataPointSpec() != nil {
+                       spec = req.GetDataPointSpec()
+               }
                var err error
-               if groups, err = w.handle(groups, writeEvent); err != nil {
+               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
                        w.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEvent)).Msg("cannot handle write event")
                        groups = make(map[string]*dataPointsInGroup)
                        continue
@@ -495,6 +541,41 @@ func encodeFieldValue(name string, fieldType 
databasev1.FieldType, fieldValue *m
        return nv
 }
 
+func buildSpecMaps(spec *measurev1.DataPointSpec) (map[string]int, 
map[string]map[string]int) {
+       if spec == nil {
+               return nil, nil
+       }
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)
+       for i, specFamily := range spec.GetTagFamilySpec() {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int)
+               for j, tagName := range specFamily.GetTagNames() {
+                       tagMap[tagName] = j
+               }
+               specTagMaps[specFamily.GetName()] = tagMap
+       }
+       return specFamilyMap, specTagMaps
+}
+
+func getSourceFamilyAndTagMap(spec *measurev1.DataPointSpec, tagFamilySpec 
*databasev1.TagFamilySpec,
+       req *measurev1.WriteRequest, i int, specFamilyMap map[string]int,
+       specTagMaps map[string]map[string]int,
+) (*modelv1.TagFamilyForWrite, map[string]int) {
+       var srcFamily *modelv1.TagFamilyForWrite
+       var specTagMap map[string]int
+       if spec != nil {
+               specIdx, ok := specFamilyMap[tagFamilySpec.Name]
+               if ok && specIdx < len(req.DataPoint.TagFamilies) {
+                       srcFamily = req.DataPoint.TagFamilies[specIdx]
+               }
+               specTagMap = specTagMaps[tagFamilySpec.Name]
+       } else if len(req.DataPoint.TagFamilies) > i {
+               srcFamily = req.DataPoint.TagFamilies[i]
+       }
+       return srcFamily, specTagMap
+}
+
 func encodeTagValue(name string, tagType databasev1.TagType, tagValue 
*modelv1.TagValue) *nameValue {
        nv := generateNameValue()
        nv.name = name
diff --git a/docs/api-reference.md b/docs/api-reference.md
index b4bc6402..88c17841 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -1783,6 +1783,7 @@ Status is the response status for write
 | STATUS_DISK_FULL | 6 |  |
 | STATUS_VERSION_UNSUPPORTED | 7 | Client version not supported |
 | STATUS_VERSION_DEPRECATED | 8 | Client version deprecated but still 
supported |
+| STATUS_METADATA_REQUIRED | 9 | Metadata is required for the first request |
 
 
  
diff --git a/test/cases/init.go b/test/cases/init.go
index d63c6628..dc66b595 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -25,6 +25,7 @@ import (
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
 
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        caseproperty 
"github.com/apache/skywalking-banyandb/test/cases/property/data"
@@ -61,6 +62,43 @@ func Initialize(addr string, now time.Time) {
        casesmeasuredata.Write(conn, "endpoint_traffic", "sw_metric", 
"endpoint_traffic.json", now, interval)
        casesmeasuredata.Write(conn, "duplicated", "exception", 
"duplicated.json", now, 0)
        casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated", 
"service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval)
+       casesmeasuredata.WriteWithSpec(conn, "service_cpm_minute", "sw_metric", 
now.Add(20*time.Minute), interval,
+               casesmeasuredata.SpecWithData{
+                       Spec: &measurev1.DataPointSpec{
+                               TagFamilySpec: []*measurev1.TagFamilySpec{
+                                       {
+                                               Name:     "default",
+                                               TagNames: []string{"entity_id", 
"id"},
+                                       },
+                               },
+                               FieldNames: []string{"value", "total"},
+                       },
+                       DataFile: "service_cpm_minute_spec_order.json",
+               },
+               casesmeasuredata.SpecWithData{
+                       Spec: &measurev1.DataPointSpec{
+                               TagFamilySpec: []*measurev1.TagFamilySpec{
+                                       {
+                                               Name:     "default",
+                                               TagNames: []string{"id", 
"entity_id"},
+                                       },
+                               },
+                               FieldNames: []string{"total", "value"},
+                       },
+                       DataFile: "service_cpm_minute_spec_order2.json",
+               })
+       casesmeasuredata.WriteMixed(conn, "service_cpm_minute", "sw_metric",
+               "service_cpm_minute_schema_order.json", 
"service_cpm_minute_spec_order.json",
+               now.Add(30*time.Minute), interval, 2*time.Minute,
+               &measurev1.DataPointSpec{
+                       TagFamilySpec: []*measurev1.TagFamilySpec{
+                               {
+                                       Name:     "default",
+                                       TagNames: []string{"entity_id", "id"},
+                               },
+                       },
+                       FieldNames: []string{"value", "total"},
+               })
        time.Sleep(5 * time.Second)
        // trace
        interval = 500 * time.Millisecond
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 10164d65..49f666b2 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -250,6 +250,35 @@ func loadData(md *commonv1.Metadata, measure 
measurev1.MeasureService_WriteClien
        }
 }
 
+func loadDataWithSpec(md *commonv1.Metadata, measure 
measurev1.MeasureService_WriteClient,
+       dataFile string, baseTime time.Time, interval time.Duration, spec 
*measurev1.DataPointSpec,
+) {
+       var templates []interface{}
+       content, err := dataFS.ReadFile("testdata/" + dataFile)
+       gm.Expect(err).ShouldNot(gm.HaveOccurred())
+       gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
+
+       isFirst := true
+       for i, template := range templates {
+               rawDataPointValue, errMarshal := json.Marshal(template)
+               gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
+               dataPointValue := &measurev1.DataPointValue{}
+               gm.Expect(protojson.Unmarshal(rawDataPointValue, 
dataPointValue)).ShouldNot(gm.HaveOccurred())
+               dataPointValue.Timestamp = 
timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval))
+
+               req := &measurev1.WriteRequest{
+                       DataPoint: dataPointValue,
+                       MessageId: uint64(time.Now().UnixNano()),
+               }
+               if isFirst {
+                       req.Metadata = md
+                       req.DataPointSpec = spec
+                       isFirst = false
+               }
+               gm.Expect(measure.Send(req)).Should(gm.Succeed())
+       }
+}
+
 // Write data into the server.
 func Write(conn *grpclib.ClientConn, name, group, dataFile string,
        baseTime time.Time, interval time.Duration,
@@ -306,3 +335,98 @@ func WriteOnly(conn *grpclib.ClientConn, name, group, 
dataFile string,
        loadData(metadata, writeClient, dataFile, baseTime, interval)
        return writeClient
 }
+
+// SpecWithData pairs a DataPointSpec with a data file.
+type SpecWithData struct {
+       Spec     *measurev1.DataPointSpec
+       DataFile string
+}
+
+// WriteWithSpec writes data using multiple data_point_specs to specify tag 
and field names.
+func WriteWithSpec(conn *grpclib.ClientConn, name, group string,
+       baseTime time.Time, interval time.Duration, specDataPairs 
...SpecWithData,
+) {
+       ctx := context.Background()
+       md := &commonv1.Metadata{
+               Name:  name,
+               Group: group,
+       }
+
+       schemaClient := databasev1.NewMeasureRegistryServiceClient(conn)
+       resp, err := schemaClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: md})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       md = resp.GetMeasure().GetMetadata()
+
+       c := measurev1.NewMeasureServiceClient(conn)
+       writeClient, err := c.Write(ctx)
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+
+       isFirstRequest := true
+       currentTime := baseTime
+       for _, pair := range specDataPairs {
+               var templates []interface{}
+               content, err := dataFS.ReadFile("testdata/" + pair.DataFile)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+               gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
+
+               isFirstForSpec := true
+               for i, template := range templates {
+                       rawDataPointValue, errMarshal := json.Marshal(template)
+                       gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
+                       dataPointValue := &measurev1.DataPointValue{}
+                       gm.Expect(protojson.Unmarshal(rawDataPointValue, 
dataPointValue)).ShouldNot(gm.HaveOccurred())
+                       dataPointValue.Timestamp = 
timestamppb.New(currentTime.Add(time.Duration(i) * interval))
+                       req := &measurev1.WriteRequest{
+                               DataPoint: dataPointValue,
+                               MessageId: uint64(time.Now().UnixNano()),
+                       }
+                       if isFirstRequest {
+                               req.Metadata = md
+                               isFirstRequest = false
+                       }
+                       if isFirstForSpec {
+                               req.DataPointSpec = pair.Spec
+                               isFirstForSpec = false
+                       }
+                       gm.Expect(writeClient.Send(req)).Should(gm.Succeed())
+               }
+               currentTime = currentTime.Add(time.Duration(len(templates)) * 
interval)
+       }
+
+       gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+       gm.Eventually(func() error {
+               _, err := writeClient.Recv()
+               return err
+       }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
+
+// WriteMixed writes data in mixed mode: first following the schema order and 
then switching to spec mode.
+func WriteMixed(conn *grpclib.ClientConn, name, group string,
+       schemaDataFile, specDataFile string,
+       baseTime time.Time, interval time.Duration,
+       specStartOffset time.Duration, spec *measurev1.DataPointSpec,
+) {
+       ctx := context.Background()
+       metadata := &commonv1.Metadata{
+               Name:  name,
+               Group: group,
+       }
+
+       schemaClient := databasev1.NewMeasureRegistryServiceClient(conn)
+       resp, err := schemaClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: metadata})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       metadata = resp.GetMeasure().GetMetadata()
+
+       c := measurev1.NewMeasureServiceClient(conn)
+       writeClient, err := c.Write(ctx)
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+
+       loadData(metadata, writeClient, schemaDataFile, baseTime, interval)
+       loadDataWithSpec(nil, writeClient, specDataFile, 
baseTime.Add(specStartOffset), interval, spec)
+
+       gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+       gm.Eventually(func() error {
+               _, err := writeClient.Recv()
+               return err
+       }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
diff --git a/test/cases/measure/data/input/write_mixed.ql 
b/test/cases/measure/data/input/write_mixed.ql
new file mode 100644
index 00000000..0382cb8b
--- /dev/null
+++ b/test/cases/measure/data/input/write_mixed.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, entity_id, total, value FROM MEASURE service_cpm_minute IN sw_metric
+TIME > '-15m'
+WHERE id IN ('id_schema_1', 'id_schema_2', 'id_schema_3', 'id_spec_1', 
'id_spec_2', 'id_spec_3')
+
diff --git a/test/cases/measure/data/input/write_mixed.yaml 
b/test/cases/measure/data/input/write_mixed.yaml
new file mode 100644
index 00000000..aefba1bb
--- /dev/null
+++ b/test/cases/measure/data/input/write_mixed.yaml
@@ -0,0 +1,41 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["id", "entity_id"]
+fieldProjection:
+  names: ["total", "value"]
+criteria:
+  condition:
+    name: "id"
+    op: "BINARY_OP_IN"
+    value:
+      strArray:
+        value:
+          [
+            "id_schema_1",
+            "id_schema_2",
+            "id_schema_3",
+            "id_spec_1",
+            "id_spec_2",
+            "id_spec_3",
+          ]
+
diff --git a/test/cases/measure/data/input/write_spec.ql 
b/test/cases/measure/data/input/write_spec.ql
new file mode 100644
index 00000000..d3111de7
--- /dev/null
+++ b/test/cases/measure/data/input/write_spec.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, entity_id, total, value FROM MEASURE service_cpm_minute IN sw_metric
+TIME > '-15m'
+WHERE id IN ('id_spec_2', 'id_spec_5')
+
diff --git a/test/cases/measure/data/input/write_spec.yaml 
b/test/cases/measure/data/input/write_spec.yaml
new file mode 100644
index 00000000..afd48e2b
--- /dev/null
+++ b/test/cases/measure/data/input/write_spec.yaml
@@ -0,0 +1,33 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+  tagFamilies:
+    - name: "default"
+      tags: ["id", "entity_id"]
+fieldProjection:
+  names: ["total", "value"]
+criteria:
+  condition:
+    name: "id"
+    op: "BINARY_OP_IN"
+    value:
+      strArray:
+        value: ["id_spec_2", "id_spec_5"]
+
diff --git 
a/test/cases/measure/data/testdata/service_cpm_minute_schema_order.json 
b/test/cases/measure/data/testdata/service_cpm_minute_schema_order.json
new file mode 100644
index 00000000..10c5abed
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_cpm_minute_schema_order.json
@@ -0,0 +1,93 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "id_schema_1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_schema_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 10
+        }
+      },
+      {
+        "int": {
+          "value": 100
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "id_schema_2"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_schema_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 20
+        }
+      },
+      {
+        "int": {
+          "value": 200
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "id_schema_3"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_schema_3"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 30
+        }
+      },
+      {
+        "int": {
+          "value": 300
+        }
+      }
+    ]
+  }
+]
+
diff --git 
a/test/cases/measure/data/testdata/service_cpm_minute_spec_order.json 
b/test/cases/measure/data/testdata/service_cpm_minute_spec_order.json
new file mode 100644
index 00000000..f4306dee
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_cpm_minute_spec_order.json
@@ -0,0 +1,93 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_spec_1"
+            }
+          },
+          {
+            "str": {
+              "value": "id_spec_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 10
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_spec_2"
+            }
+          },
+          {
+            "str": {
+              "value": "id_spec_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 200
+        }
+      },
+      {
+        "int": {
+          "value": 20
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_spec_3"
+            }
+          },
+          {
+            "str": {
+              "value": "id_spec_3"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      },
+      {
+        "int": {
+          "value": 30
+        }
+      }
+    ]
+  }
+]
+
diff --git 
a/test/cases/measure/data/testdata/service_cpm_minute_spec_order2.json 
b/test/cases/measure/data/testdata/service_cpm_minute_spec_order2.json
new file mode 100644
index 00000000..fc0abd0f
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_cpm_minute_spec_order2.json
@@ -0,0 +1,93 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "id_spec_4"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_spec_4"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 40
+        }
+      },
+      {
+        "int": {
+          "value": 400
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "id_spec_5"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_spec_5"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      },
+      {
+        "int": {
+          "value": 500
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "id_spec_6"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_spec_6"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 60
+        }
+      },
+      {
+        "int": {
+          "value": 600
+        }
+      }
+    ]
+  }
+]
+
diff --git a/test/cases/measure/data/want/write_mixed.yaml 
b/test/cases/measure/data/want/write_mixed.yaml
new file mode 100644
index 00000000..b8f734fa
--- /dev/null
+++ b/test/cases/measure/data/want/write_mixed.yaml
@@ -0,0 +1,139 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_spec_2
+          - key: entity_id
+            value:
+              str:
+                value: entity_spec_2
+    fields:
+      - name: total
+        value:
+          int:
+            value: "20"
+      - name: value
+        value:
+          int:
+            value: "200"
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_schema_1
+          - key: entity_id
+            value:
+              str:
+                value: entity_schema_1
+    fields:
+      - name: total
+        value:
+          int:
+            value: "10"
+      - name: value
+        value:
+          int:
+            value: "100"
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_schema_3
+          - key: entity_id
+            value:
+              str:
+                value: entity_schema_3
+    fields:
+      - name: total
+        value:
+          int:
+            value: "30"
+      - name: value
+        value:
+          int:
+            value: "300"
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_spec_3
+          - key: entity_id
+            value:
+              str:
+                value: entity_spec_3
+    fields:
+      - name: total
+        value:
+          int:
+            value: "30"
+      - name: value
+        value:
+          int:
+            value: "300"
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_schema_2
+          - key: entity_id
+            value:
+              str:
+                value: entity_schema_2
+    fields:
+      - name: total
+        value:
+          int:
+            value: "20"
+      - name: value
+        value:
+          int:
+            value: "200"
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_spec_1
+          - key: entity_id
+            value:
+              str:
+                value: entity_spec_1
+    fields:
+      - name: total
+        value:
+          int:
+            value: "10"
+      - name: value
+        value:
+          int:
+            value: "100"
+
diff --git a/test/cases/measure/data/want/write_spec.yaml 
b/test/cases/measure/data/want/write_spec.yaml
new file mode 100644
index 00000000..974802c4
--- /dev/null
+++ b/test/cases/measure/data/want/write_spec.yaml
@@ -0,0 +1,59 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_spec_2
+          - key: entity_id
+            value:
+              str:
+                value: entity_spec_2
+    fields:
+      - name: total
+        value:
+          int:
+            value: "20"
+      - name: value
+        value:
+          int:
+            value: "200"
+  - tagFamilies:
+      - name: default
+        tags:
+          - key: id
+            value:
+              str:
+                value: id_spec_5
+          - key: entity_id
+            value:
+              str:
+                value: entity_spec_5
+    fields:
+      - name: total
+        value:
+          int:
+            value: "50"
+      - name: value
+        value:
+          int:
+            value: "500"
+
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 5e692974..24469ef9 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -87,4 +87,6 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("query by id in index mode", helpers.Args{Input: 
"index_mode_by_id", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("multi groups: unchanged", helpers.Args{Input: 
"multi_group_unchanged", Duration: 35 * time.Minute, Offset: -20 * 
time.Minute}),
        g.Entry("multi groups: new tag and fields", helpers.Args{Input: 
"multi_group_new_tag_field", Duration: 35 * time.Minute, Offset: -20 * 
time.Minute}),
+       g.Entry("write spec", helpers.Args{Input: "write_spec", Duration: 15 * 
time.Minute, Offset: 15 * time.Minute, DisOrder: true}),
+       g.Entry("write mixed", helpers.Args{Input: "write_mixed", Duration: 15 
* time.Minute, Offset: 25 * time.Minute, DisOrder: true}),
 )

Reply via email to