This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 9552688d Add mod revision check to write requests(measure/stream)
(#322)
9552688d is described below
commit 9552688d60f055f6bb539a91464e8b1db77e7309
Author: hailin0 <[email protected]>
AuthorDate: Fri Sep 15 14:21:32 2023 +0800
Add mod revision check to write requests(measure/stream) (#322)
* Add mod revision check to write requests(measure/stream)
- Support for create/update schema check mod revision
- Support for write data check mod revision
- Adapt to web console requests
- Update OAP e2e image
- Fix query failure caused by schema change
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
api/proto/banyandb/database/v1/rpc.proto | 16 ++++--
api/proto/banyandb/measure/v1/write.proto | 12 ++++-
api/proto/banyandb/model/v1/write.proto | 33 ++++++++++++
api/proto/banyandb/stream/v1/write.proto | 14 ++++-
banyand/liaison/grpc/discovery.go | 15 +++---
banyand/liaison/grpc/measure.go | 26 ++++++---
banyand/liaison/grpc/registry.go | 28 +++++++---
banyand/liaison/grpc/stream.go | 26 ++++++---
banyand/measure/measure_topn.go | 3 +-
banyand/measure/measure_write.go | 1 +
banyand/measure/metadata.go | 4 +-
banyand/measure/metadata_test.go | 4 +-
banyand/metadata/schema/etcd.go | 82 +++++++++++++++--------------
banyand/metadata/schema/etcd_test.go | 2 +-
banyand/metadata/schema/group.go | 6 ++-
banyand/metadata/schema/index.go | 12 +++--
banyand/metadata/schema/measure.go | 15 +++---
banyand/metadata/schema/property.go | 4 +-
banyand/metadata/schema/schema.go | 15 +++---
banyand/metadata/schema/shard.go | 5 +-
banyand/metadata/schema/stream.go | 13 ++---
banyand/metadata/schema/topn.go | 6 ++-
banyand/metadata/schema/watcher_test.go | 8 ++-
banyand/stream/metadata_test.go | 4 +-
docs/api-reference.md | 73 ++++++++++++++++++++++++-
pkg/partition/entity.go | 15 +++---
pkg/query/logical/common.go | 23 +++++---
pkg/test/measure/etcd.go | 3 +-
pkg/test/stream/etcd.go | 2 +-
test/cases/measure/data/data.go | 18 +++++--
test/cases/stream/data/data.go | 23 +++++---
test/docker/base-compose.yml | 4 +-
test/e2e-v2/script/env | 2 +-
test/stress/cases/istio/istio_suite_test.go | 1 +
test/stress/cases/istio/repo.go | 3 +-
test/stress/env | 2 +-
test/stress/env.dev | 2 +-
ui/src/components/Editor/index.vue | 6 ++-
39 files changed, 382 insertions(+), 150 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0d98483c..58fcd220 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
- Implement the remote queue to spreading data to data nodes.
- Fix parse environment variables error
- Implement the distributed query engine.
+- Add mod revision check to write requests.
### Bugs
diff --git a/api/proto/banyandb/database/v1/rpc.proto
b/api/proto/banyandb/database/v1/rpc.proto
index 1fbccd5d..09ffada3 100644
--- a/api/proto/banyandb/database/v1/rpc.proto
+++ b/api/proto/banyandb/database/v1/rpc.proto
@@ -32,13 +32,17 @@ message StreamRegistryServiceCreateRequest {
banyandb.database.v1.Stream stream = 1;
}
-message StreamRegistryServiceCreateResponse {}
+message StreamRegistryServiceCreateResponse {
+ int64 mod_revision = 1;
+}
message StreamRegistryServiceUpdateRequest {
banyandb.database.v1.Stream stream = 1;
}
-message StreamRegistryServiceUpdateResponse {}
+message StreamRegistryServiceUpdateResponse {
+ int64 mod_revision = 1;
+}
message StreamRegistryServiceDeleteRequest {
banyandb.common.v1.Metadata metadata = 1;
@@ -260,13 +264,17 @@ message MeasureRegistryServiceCreateRequest {
banyandb.database.v1.Measure measure = 1;
}
-message MeasureRegistryServiceCreateResponse {}
+message MeasureRegistryServiceCreateResponse {
+ int64 mod_revision = 1;
+}
message MeasureRegistryServiceUpdateRequest {
banyandb.database.v1.Measure measure = 1;
}
-message MeasureRegistryServiceUpdateResponse {}
+message MeasureRegistryServiceUpdateResponse {
+ int64 mod_revision = 1;
+}
message MeasureRegistryServiceDeleteRequest {
banyandb.common.v1.Metadata metadata = 1;
diff --git a/api/proto/banyandb/measure/v1/write.proto
b/api/proto/banyandb/measure/v1/write.proto
index 49a7a004..9d007334 100644
--- a/api/proto/banyandb/measure/v1/write.proto
+++ b/api/proto/banyandb/measure/v1/write.proto
@@ -21,6 +21,7 @@ package banyandb.measure.v1;
import "banyandb/common/v1/common.proto";
import "banyandb/model/v1/common.proto";
+import "banyandb/model/v1/write.proto";
import "google/protobuf/timestamp.proto";
import "validate/validate.proto";
@@ -43,10 +44,19 @@ message WriteRequest {
common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
// the data_point is required.
DataPointValue data_point = 2 [(validate.rules).message.required = true];
+ // the message_id is required.
+ uint64 message_id = 3 [(validate.rules).uint64.gt = 0];
}
// WriteResponse is the response contract for write
-message WriteResponse {}
+message WriteResponse {
+ // the message_id from request.
+ uint64 message_id = 1 [(validate.rules).uint64.gt = 0];
+ // status indicates the request processing result
+ model.v1.Status status = 2 [(validate.rules).enum.defined_only = true];
+ // the metadata from request when request fails
+ common.v1.Metadata metadata = 3 [(validate.rules).message.required = true];
+}
message InternalWriteRequest {
uint32 shard_id = 1;
diff --git a/api/proto/banyandb/model/v1/write.proto
b/api/proto/banyandb/model/v1/write.proto
new file mode 100644
index 00000000..83b8c91e
--- /dev/null
+++ b/api/proto/banyandb/model/v1/write.proto
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.model.v1;
+
+option go_package =
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1";
+option java_package = "org.apache.skywalking.banyandb.model.v1";
+
+// Status is the response status for write
+enum Status {
+ STATUS_UNSPECIFIED = 0;
+ STATUS_SUCCEED = 1;
+ STATUS_INVALID_TIMESTAMP = 2;
+ STATUS_NOT_FOUND = 3;
+ STATUS_EXPIRED_SCHEMA = 4;
+ STATUS_INTERNAL_ERROR = 5;
+}
diff --git a/api/proto/banyandb/stream/v1/write.proto
b/api/proto/banyandb/stream/v1/write.proto
index e177eb11..1c3347f1 100644
--- a/api/proto/banyandb/stream/v1/write.proto
+++ b/api/proto/banyandb/stream/v1/write.proto
@@ -21,6 +21,7 @@ package banyandb.stream.v1;
import "banyandb/common/v1/common.proto";
import "banyandb/model/v1/common.proto";
+import "banyandb/model/v1/write.proto";
import "google/protobuf/timestamp.proto";
import "validate/validate.proto";
@@ -39,13 +40,22 @@ message ElementValue {
}
message WriteRequest {
- // the metadata is only required in the first write.
+ // the metadata is required.
common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
// the element is required.
ElementValue element = 2 [(validate.rules).message.required = true];
+ // the message_id is required.
+ uint64 message_id = 3 [(validate.rules).uint64.gt = 0];
}
-message WriteResponse {}
+message WriteResponse {
+ // the message_id from request.
+ uint64 message_id = 1 [(validate.rules).uint64.gt = 0];
+ // status indicates the request processing result
+ model.v1.Status status = 2 [(validate.rules).enum.defined_only = true];
+ // the metadata from request when request fails
+ common.v1.Metadata metadata = 3 [(validate.rules).message.required = true];
+}
message InternalWriteRequest {
uint32 shard_id = 1;
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index e2fb24c9..823183b1 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -231,14 +231,17 @@ type entityRepo struct {
func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
var el partition.EntityLocator
var id identity
+ var modRevision int64
switch schemaMetadata.Kind {
case schema.KindMeasure:
measure := schemaMetadata.Spec.(*databasev1.Measure)
- el = partition.NewEntityLocator(measure.TagFamilies,
measure.Entity)
+ modRevision = measure.GetMetadata().GetModRevision()
+ el = partition.NewEntityLocator(measure.TagFamilies,
measure.Entity, modRevision)
id = getID(measure.GetMetadata())
case schema.KindStream:
stream := schemaMetadata.Spec.(*databasev1.Stream)
- el = partition.NewEntityLocator(stream.TagFamilies,
stream.Entity)
+ modRevision = stream.GetMetadata().GetModRevision()
+ el = partition.NewEntityLocator(stream.TagFamilies,
stream.Entity, modRevision)
id = getID(stream.GetMetadata())
default:
return
@@ -259,8 +262,8 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
Str("kind", kind).
Msg("entity added or updated")
}
- en := make(partition.EntityLocator, 0, len(el))
- for _, l := range el {
+ en := make([]partition.TagLocator, 0, len(el.TagLocators))
+ for _, l := range el.TagLocators {
en = append(en, partition.TagLocator{
FamilyOffset: l.FamilyOffset,
TagOffset: l.TagOffset,
@@ -268,7 +271,7 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
}
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
- e.entitiesMap[id] = en
+ e.entitiesMap[id] = partition.EntityLocator{TagLocators: en,
ModRevision: modRevision}
}
// OnDelete implements schema.EventHandler.
@@ -310,7 +313,7 @@ func (e *entityRepo) getLocator(id identity)
(partition.EntityLocator, bool) {
defer e.RWMutex.RUnlock()
el, ok := e.entitiesMap[id]
if !ok {
- return nil, false
+ return partition.EntityLocator{}, false
}
return el, true
}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 8a541147..90cf7632 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -28,7 +28,9 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -56,8 +58,8 @@ func (ms *measureService) activeIngestionAccessLog(root
string) (err error) {
}
func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer)
error {
- reply := func(measure measurev1.MeasureService_WriteServer, logger
*logger.Logger) {
- if errResp := measure.Send(&measurev1.WriteResponse{}); errResp
!= nil {
+ reply := func(metadata *commonv1.Metadata, status modelv1.Status,
messageId uint64, measure measurev1.MeasureService_WriteServer, logger
*logger.Logger) {
+ if errResp := measure.Send(&measurev1.WriteResponse{Metadata:
metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Err(errResp).Msg("failed to send response")
}
}
@@ -76,18 +78,28 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
}
if err != nil {
ms.sampled.Error().Err(err).Stringer("written",
writeRequest).Msg("failed to receive message")
- reply(measure, ms.sampled)
- continue
+ return err
}
if errTime :=
timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
ms.sampled.Error().Err(errTime).Stringer("written",
writeRequest).Msg("the data point time is invalid")
- reply(measure, ms.sampled)
+ reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure,
ms.sampled)
+ continue
+ }
+ measureCache, existed :=
ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
+ if !existed {
+ ms.sampled.Error().Err(err).Stringer("written",
writeRequest).Msg("failed to measure schema not found")
+ reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure,
ms.sampled)
+ continue
+ }
+ if writeRequest.Metadata.ModRevision !=
measureCache.ModRevision {
+ ms.sampled.Error().Stringer("written",
writeRequest).Msg("the measure schema is expired")
+ reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure,
ms.sampled)
continue
}
entity, tagValues, shardID, err :=
ms.navigate(writeRequest.GetMetadata(),
writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
ms.sampled.Error().Err(err).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
- reply(measure, ms.sampled)
+ reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure,
ms.sampled)
continue
}
if ms.ingestionAccessLog != nil {
@@ -107,7 +119,7 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
if errWritePub != nil {
ms.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to send a message")
}
- reply(measure, ms.sampled)
+ reply(nil, modelv1.Status_STATUS_SUCCEED,
writeRequest.GetMessageId(), measure, ms.sampled)
}
}
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 93e49138..511d4e9b 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -38,19 +38,25 @@ type streamRegistryServer struct {
func (rs *streamRegistryServer) Create(ctx context.Context,
req *databasev1.StreamRegistryServiceCreateRequest,
) (*databasev1.StreamRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx,
req.GetStream()); err != nil {
+ modRevision, err :=
rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream())
+ if err != nil {
return nil, err
}
- return &databasev1.StreamRegistryServiceCreateResponse{}, nil
+ return &databasev1.StreamRegistryServiceCreateResponse{
+ ModRevision: modRevision,
+ }, nil
}
func (rs *streamRegistryServer) Update(ctx context.Context,
req *databasev1.StreamRegistryServiceUpdateRequest,
) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
- if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx,
req.GetStream()); err != nil {
+ modRevision, err :=
rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream())
+ if err != nil {
return nil, err
}
- return &databasev1.StreamRegistryServiceUpdateResponse{}, nil
+ return &databasev1.StreamRegistryServiceUpdateResponse{
+ ModRevision: modRevision,
+ }, nil
}
func (rs *streamRegistryServer) Delete(ctx context.Context,
@@ -287,19 +293,25 @@ type measureRegistryServer struct {
func (rs *measureRegistryServer) Create(ctx context.Context, req
*databasev1.MeasureRegistryServiceCreateRequest) (
*databasev1.MeasureRegistryServiceCreateResponse, error,
) {
- if err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx,
req.GetMeasure()); err != nil {
+ modRevision, err :=
rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure())
+ if err != nil {
return nil, err
}
- return &databasev1.MeasureRegistryServiceCreateResponse{}, nil
+ return &databasev1.MeasureRegistryServiceCreateResponse{
+ ModRevision: modRevision,
+ }, nil
}
func (rs *measureRegistryServer) Update(ctx context.Context, req
*databasev1.MeasureRegistryServiceUpdateRequest) (
*databasev1.MeasureRegistryServiceUpdateResponse, error,
) {
- if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx,
req.GetMeasure()); err != nil {
+ modRevision, err :=
rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure())
+ if err != nil {
return nil, err
}
- return &databasev1.MeasureRegistryServiceUpdateResponse{}, nil
+ return &databasev1.MeasureRegistryServiceUpdateResponse{
+ ModRevision: modRevision,
+ }, nil
}
func (rs *measureRegistryServer) Delete(ctx context.Context, req
*databasev1.MeasureRegistryServiceDeleteRequest) (
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index e4cc1ed1..d188f975 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -28,6 +28,8 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
@@ -56,8 +58,8 @@ func (s *streamService) activeIngestionAccessLog(root string)
(err error) {
}
func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error
{
- reply := func(stream streamv1.StreamService_WriteServer, logger
*logger.Logger) {
- if errResp := stream.Send(&streamv1.WriteResponse{}); errResp
!= nil {
+ reply := func(metadata *commonv1.Metadata, status modelv1.Status,
messageId uint64, stream streamv1.StreamService_WriteServer, logger
*logger.Logger) {
+ if errResp := stream.Send(&streamv1.WriteResponse{Metadata:
metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Err(errResp).Msg("failed to send response")
}
}
@@ -76,18 +78,28 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
}
if err != nil {
s.sampled.Error().Stringer("written",
writeEntity).Err(err).Msg("failed to receive message")
- reply(stream, s.sampled)
- continue
+ return err
}
if errTime :=
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
s.sampled.Error().Stringer("written",
writeEntity).Err(errTime).Msg("the element time is invalid")
- reply(stream, s.sampled)
+ reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetMessageId(), stream, s.sampled)
+ continue
+ }
+ streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+ if !existed {
+ s.sampled.Error().Err(err).Stringer("written",
writeEntity).Msg("failed to stream schema not found")
+ reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
+ continue
+ }
+ if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
+ s.sampled.Error().Stringer("written",
writeEntity).Msg("the stream schema is expired")
+ reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream,
s.sampled)
continue
}
entity, tagValues, shardID, err :=
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
s.sampled.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
- reply(stream, s.sampled)
+ reply(nil, modelv1.Status_STATUS_INTERNAL_ERROR,
writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if s.ingestionAccessLog != nil {
@@ -108,7 +120,7 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
if errWritePub != nil {
s.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to send a message")
}
- reply(stream, s.sampled)
+ reply(nil, modelv1.Status_STATUS_SUCCEED,
writeEntity.GetMessageId(), stream, s.sampled)
}
}
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index fe516512..15780b54 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -182,7 +182,8 @@ func (t *topNStreamingProcessor) writeData(eventTime
time.Time, timeBucket strin
measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
iwr := &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
- Metadata: t.topNSchema.GetMetadata(),
+ MessageId: uint64(time.Now().UnixNano()),
+ Metadata: t.topNSchema.GetMetadata(),
DataPoint: &measurev1.DataPointValue{
Timestamp: timestamppb.New(eventTime),
TagFamilies: []*modelv1.TagFamilyForWrite{
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 3235b5c6..5384480f 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -136,6 +136,7 @@ func (s *measure) write(shardID common.ShardID, entity
[]byte, entityValues tsdb
Request: &measurev1.WriteRequest{
Metadata: s.GetSchema().Metadata,
DataPoint: value,
+ MessageId: uint64(time.Now().UnixNano()),
},
EntityValues: entityValues[1:],
})
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 26a6b648..053abee3 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -219,7 +219,7 @@ func createOrUpdateTopNMeasure(ctx context.Context,
measureSchemaRegistry schema
Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
}
if oldTopNSchema == nil {
- if innerErr := measureSchemaRegistry.CreateMeasure(ctx,
newTopNMeasure); innerErr != nil {
+ if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx,
newTopNMeasure); innerErr != nil {
return nil, innerErr
}
return newTopNMeasure, nil
@@ -233,7 +233,7 @@ func createOrUpdateTopNMeasure(ctx context.Context,
measureSchemaRegistry schema
return oldTopNSchema, nil
}
// update
- if err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); err
!= nil {
+ if _, err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure);
err != nil {
return nil, err
}
return newTopNMeasure, nil
diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go
index f6cda0f1..808a123d 100644
--- a/banyand/measure/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -122,7 +122,9 @@ var _ = Describe("Metadata", func() {
measureSchema.Entity.TagNames =
measureSchema.Entity.TagNames[1:]
entitySize := len(measureSchema.Entity.TagNames)
-
Expect(svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(),
measureSchema)).Should(Succeed())
+ modRevision, err :=
svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(),
measureSchema)
+ Expect(modRevision).ShouldNot(BeZero())
+ Expect(err).ShouldNot(HaveOccurred())
Eventually(func() bool {
val, err :=
svcs.measure.Measure(&commonv1.Metadata{
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index f53a718c..d06d865e 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -185,88 +185,92 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key
string, message proto.
// update will first ensure the existence of the entity with the metadata,
// and overwrite the existing value if so.
// Otherwise, it will return ErrGRPCResourceNotFound.
-func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata)
error {
+func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata)
(int64, error) {
if !e.closer.AddRunning() {
- return ErrClosed
+ return 0, ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
if err != nil {
- return err
+ return 0, err
}
key = e.prependNamespace(key)
getResp, err := e.client.Get(ctx, key)
if err != nil {
- return err
+ return 0, err
}
if getResp.Count > 1 {
- return errUnexpectedNumberOfEntities
+ return 0, errUnexpectedNumberOfEntities
}
val, err := proto.Marshal(metadata.Spec.(proto.Message))
if err != nil {
- return err
+ return 0, err
}
replace := getResp.Count > 0
- if replace {
- existingVal, innerErr := metadata.Kind.Unmarshal(getResp.Kvs[0])
- if innerErr != nil {
- return innerErr
- }
- // directly return if we have the same entity
- if metadata.equal(existingVal) {
- return nil
- }
+ if !replace {
+ return 0, ErrGRPCResourceNotFound
+ }
+ existingVal, innerErr := metadata.Kind.Unmarshal(getResp.Kvs[0])
+ if innerErr != nil {
+ return 0, innerErr
+ }
+ // directly return if we have the same entity
+ if metadata.equal(existingVal) {
+ return 0, nil
+ }
- modRevision := getResp.Kvs[0].ModRevision
- txnResp, txnErr := e.client.Txn(ctx).
- If(clientv3.Compare(clientv3.ModRevision(key), "=",
modRevision)).
- Then(clientv3.OpPut(key, string(val))).
- Commit()
- if txnErr != nil {
- return txnErr
- }
- if !txnResp.Succeeded {
- return errConcurrentModification
- }
- } else {
- return ErrGRPCResourceNotFound
+ modRevision := metadata.ModRevision
+ if modRevision == 0 {
+ modRevision = getResp.Kvs[0].ModRevision
}
- return nil
+ txnResp, txnErr := e.client.Txn(ctx).
+ If(clientv3.Compare(clientv3.ModRevision(key), "=",
modRevision)).
+ Then(clientv3.OpPut(key, string(val))).
+ Commit()
+ if txnErr != nil {
+ return 0, txnErr
+ }
+ if !txnResp.Succeeded {
+ return 0, errConcurrentModification
+ }
+
+ return txnResp.Responses[0].GetResponsePut().Header.Revision, nil
}
// create will first check existence of the entity with the metadata,
// and put the value if it does not exist.
// Otherwise, it will return ErrGRPCAlreadyExists.
-func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata)
error {
+func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata)
(int64, error) {
if !e.closer.AddRunning() {
- return ErrClosed
+ return 0, ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
if err != nil {
- return err
+ return 0, err
}
key = e.prependNamespace(key)
getResp, err := e.client.Get(ctx, key)
if err != nil {
- return err
+ return 0, err
}
if getResp.Count > 1 {
- return errUnexpectedNumberOfEntities
+ return 0, errUnexpectedNumberOfEntities
}
val, err := proto.Marshal(metadata.Spec.(proto.Message))
if err != nil {
- return err
+ return 0, err
}
replace := getResp.Count > 0
if replace {
- return errGRPCAlreadyExists
+ return 0, errGRPCAlreadyExists
}
- _, err = e.client.Put(ctx, key, string(val))
+ putResp, err := e.client.Put(ctx, key, string(val))
if err != nil {
- return err
+ return 0, err
}
- return nil
+
+ return putResp.Header.Revision, nil
}
func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix
string, kind Kind) ([]proto.Message, error) {
diff --git a/banyand/metadata/schema/etcd_test.go
b/banyand/metadata/schema/etcd_test.go
index fc98664f..e4fc3913 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -77,7 +77,7 @@ func preloadSchema(e Registry) error {
if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
return err
}
- err := e.CreateStream(context.Background(), s)
+ _, err := e.CreateStream(context.Background(), s)
if err != nil {
return err
}
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 5afa54d2..0ef85e59 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -76,23 +76,25 @@ func (e *etcdSchemaRegistry) CreateGroup(ctx
context.Context, group *commonv1.Gr
if group.UpdatedAt != nil {
group.UpdatedAt = timestamppb.Now()
}
- return e.create(ctx, Metadata{
+ _, err := e.create(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindGroup,
Name: group.GetMetadata().GetName(),
},
Spec: group,
})
+ return err
}
func (e *etcdSchemaRegistry) UpdateGroup(ctx context.Context, group
*commonv1.Group) error {
- return e.update(ctx, Metadata{
+ _, err := e.update(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindGroup,
Name: group.GetMetadata().GetName(),
},
Spec: group,
})
+ return err
}
func formatGroupKey(group string) string {
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index 41ff541b..41b1b284 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -59,7 +59,7 @@ func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx
context.Context, indexRu
if indexRuleBinding.UpdatedAt != nil {
indexRuleBinding.UpdatedAt = timestamppb.Now()
}
- return e.create(ctx, Metadata{
+ _, err := e.create(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindIndexRuleBinding,
Name: indexRuleBinding.GetMetadata().GetName(),
@@ -67,10 +67,11 @@ func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx
context.Context, indexRu
},
Spec: indexRuleBinding,
})
+ return err
}
func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context,
indexRuleBinding *databasev1.IndexRuleBinding) error {
- return e.update(ctx, Metadata{
+ _, err := e.update(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindIndexRuleBinding,
Name: indexRuleBinding.GetMetadata().GetName(),
@@ -78,6 +79,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx
context.Context, indexRu
},
Spec: indexRuleBinding,
})
+ return err
}
func (e *etcdSchemaRegistry) DeleteIndexRuleBinding(ctx context.Context,
metadata *commonv1.Metadata) (bool, error) {
@@ -129,7 +131,7 @@ func (e *etcdSchemaRegistry) CreateIndexRule(ctx
context.Context, indexRule *dat
buf = append(buf, indexRule.Metadata.Name...)
indexRule.Metadata.Id = crc32.ChecksumIEEE(buf)
}
- return e.create(ctx, Metadata{
+ _, err := e.create(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindIndexRule,
Name: indexRule.GetMetadata().GetName(),
@@ -137,6 +139,7 @@ func (e *etcdSchemaRegistry) CreateIndexRule(ctx
context.Context, indexRule *dat
},
Spec: indexRule,
})
+ return err
}
func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule
*databasev1.IndexRule) error {
@@ -147,7 +150,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRule(ctx
context.Context, indexRule *dat
}
indexRule.Metadata.Id = existingIndexRule.Metadata.Id
}
- return e.update(ctx, Metadata{
+ _, err := e.update(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindIndexRule,
Name: indexRule.GetMetadata().GetName(),
@@ -155,6 +158,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRule(ctx
context.Context, indexRule *dat
},
Spec: indexRule,
})
+ return err
}
func (e *etcdSchemaRegistry) DeleteIndexRule(ctx context.Context, metadata
*commonv1.Metadata) (bool, error) {
diff --git a/banyand/metadata/schema/measure.go
b/banyand/metadata/schema/measure.go
index 5b0b6f50..a17f8fcd 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -56,13 +56,13 @@ func (e *etcdSchemaRegistry) ListMeasure(ctx
context.Context, opt ListOpt) ([]*d
return entities, nil
}
-func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure
*databasev1.Measure) error {
+func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure
*databasev1.Measure) (int64, error) {
if measure.UpdatedAt != nil {
measure.UpdatedAt = timestamppb.Now()
}
if measure.GetInterval() != "" {
if _, err := timestamp.ParseDuration(measure.GetInterval());
err != nil {
- return errors.Wrap(err, "interval is malformed")
+ return 0, errors.Wrap(err, "interval is malformed")
}
}
return e.create(ctx, Metadata{
@@ -75,17 +75,18 @@ func (e *etcdSchemaRegistry) CreateMeasure(ctx
context.Context, measure *databas
})
}
-func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure
*databasev1.Measure) error {
+func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure
*databasev1.Measure) (int64, error) {
if measure.GetInterval() != "" {
if _, err := timestamp.ParseDuration(measure.GetInterval());
err != nil {
- return errors.Wrap(err, "interval is malformed")
+ return 0, errors.Wrap(err, "interval is malformed")
}
}
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
- Kind: KindMeasure,
- Group: measure.GetMetadata().GetGroup(),
- Name: measure.GetMetadata().GetName(),
+ Kind: KindMeasure,
+ Group: measure.GetMetadata().GetGroup(),
+ Name: measure.GetMetadata().GetName(),
+ ModRevision: measure.GetMetadata().GetModRevision(),
},
Spec: measure,
})
diff --git a/banyand/metadata/schema/property.go
b/banyand/metadata/schema/property.go
index bb76f0a8..1456199a 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -101,7 +101,7 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx
context.Context, property *proper
Spec: property,
}
tagsNum := uint32(len(property.Tags))
- err := e.create(ctx, md)
+ _, err := e.create(ctx, md)
if err == nil {
return true, tagsNum, nil
}
@@ -125,7 +125,7 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx
context.Context, property *proper
existed.Tags = append(existed.Tags, property.Tags...)
md.Spec = existed
}
- if err = e.update(ctx, md); err != nil {
+ if _, err = e.update(ctx, md); err != nil {
return false, 0, err
}
return false, tagsNum, nil
diff --git a/banyand/metadata/schema/schema.go
b/banyand/metadata/schema/schema.go
index d5dc0185..6806b1a5 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -60,9 +60,10 @@ type Registry interface {
// TypeMeta defines the identity and type of an Event.
type TypeMeta struct {
- Name string
- Group string
- Kind Kind
+ Name string
+ Group string
+ ModRevision int64
+ Kind Kind
}
// Metadata wrap dedicated serialized resource and its TypeMeta.
@@ -136,8 +137,8 @@ func (m Metadata) equal(other Metadata) bool {
type Stream interface {
GetStream(ctx context.Context, metadata *commonv1.Metadata)
(*databasev1.Stream, error)
ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream,
error)
- CreateStream(ctx context.Context, stream *databasev1.Stream) error
- UpdateStream(ctx context.Context, stream *databasev1.Stream) error
+ CreateStream(ctx context.Context, stream *databasev1.Stream) (int64,
error)
+ UpdateStream(ctx context.Context, stream *databasev1.Stream) (int64,
error)
DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool,
error)
}
@@ -163,8 +164,8 @@ type IndexRuleBinding interface {
type Measure interface {
GetMeasure(ctx context.Context, metadata *commonv1.Metadata)
(*databasev1.Measure, error)
ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure,
error)
- CreateMeasure(ctx context.Context, measure *databasev1.Measure) error
- UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error
+ CreateMeasure(ctx context.Context, measure *databasev1.Measure) (int64,
error)
+ UpdateMeasure(ctx context.Context, measure *databasev1.Measure) (int64,
error)
DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool,
error)
TopNAggregations(ctx context.Context, metadata *commonv1.Metadata)
([]*databasev1.TopNAggregation, error)
}
diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go
index 8f1c7a42..c6499931 100644
--- a/banyand/metadata/schema/shard.go
+++ b/banyand/metadata/schema/shard.go
@@ -41,14 +41,15 @@ func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx
context.Context, shard *dat
},
Spec: shard,
}
- err := e.update(ctx, md)
+ _, err := e.update(ctx, md)
if err == nil {
return nil
}
if errors.Is(err, ErrGRPCResourceNotFound) {
shard.CreatedAt = shard.UpdatedAt
md.Spec = shard
- return e.create(ctx, md)
+ _, err = e.create(ctx, md)
+ return err
}
return err
}
diff --git a/banyand/metadata/schema/stream.go
b/banyand/metadata/schema/stream.go
index d2e2491f..645a635d 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -51,25 +51,26 @@ func (e *etcdSchemaRegistry) ListStream(ctx
context.Context, opt ListOpt) ([]*da
return entities, nil
}
-func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream
*databasev1.Stream) error {
+func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream
*databasev1.Stream) (int64, error) {
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
- Kind: KindStream,
- Group: stream.GetMetadata().GetGroup(),
- Name: stream.GetMetadata().GetName(),
+ Kind: KindStream,
+ Group: stream.GetMetadata().GetGroup(),
+ Name: stream.GetMetadata().GetName(),
+ ModRevision: stream.GetMetadata().GetModRevision(),
},
Spec: stream,
})
}
-func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream
*databasev1.Stream) error {
+func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream
*databasev1.Stream) (int64, error) {
if stream.UpdatedAt != nil {
stream.UpdatedAt = timestamppb.Now()
}
group := stream.Metadata.GetGroup()
_, err := e.GetGroup(ctx, group)
if err != nil {
- return err
+ return 0, err
}
return e.create(ctx, Metadata{
TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/topn.go b/banyand/metadata/schema/topn.go
index 858fdf04..60597f1a 100644
--- a/banyand/metadata/schema/topn.go
+++ b/banyand/metadata/schema/topn.go
@@ -55,7 +55,7 @@ func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx
context.Context, topNAggr
if topNAggregation.UpdatedAt != nil {
topNAggregation.UpdatedAt = timestamppb.Now()
}
- return e.create(ctx, Metadata{
+ _, err := e.create(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindTopNAggregation,
Group: topNAggregation.GetMetadata().GetGroup(),
@@ -63,10 +63,11 @@ func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx
context.Context, topNAggr
},
Spec: topNAggregation,
})
+ return err
}
func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx context.Context,
topNAggregation *databasev1.TopNAggregation) error {
- return e.update(ctx, Metadata{
+ _, err := e.update(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindTopNAggregation,
Group: topNAggregation.GetMetadata().GetGroup(),
@@ -74,6 +75,7 @@ func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx
context.Context, topNAggr
},
Spec: topNAggregation,
})
+ return err
}
func (e *etcdSchemaRegistry) DeleteTopNAggregation(ctx context.Context,
metadata *commonv1.Metadata) (bool, error) {
diff --git a/banyand/metadata/schema/watcher_test.go
b/banyand/metadata/schema/watcher_test.go
index a796c4bb..cc5b2f45 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -118,13 +118,15 @@ var _ = ginkgo.Describe("Watcher", func() {
},
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ var modRevision int64
for i := 0; i < 2; i++ {
- err = registry.CreateMeasure(context.Background(),
&databasev1.Measure{
+ modRevision, err =
registry.CreateMeasure(context.Background(), &databasev1.Measure{
Metadata: &commonv1.Metadata{
Name: fmt.Sprintf("testkey%d", i+1),
Group: "testgroup-measure",
},
})
+ gomega.Expect(modRevision).ShouldNot(gomega.BeZero())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
@@ -156,12 +158,14 @@ var _ = ginkgo.Describe("Watcher", func() {
},
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- err = registry.CreateStream(context.Background(),
&databasev1.Stream{
+ var modRevision int64
+ modRevision, err = registry.CreateStream(context.Background(),
&databasev1.Stream{
Metadata: &commonv1.Metadata{
Name: "testkey",
Group: "testgroup-stream",
},
})
+ gomega.Expect(modRevision).ShouldNot(gomega.BeZero())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
ok, err := registry.DeleteStream(context.Background(),
&commonv1.Metadata{
Name: "testkey",
diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go
index 8c4f3bfb..f0133ea5 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/stream/metadata_test.go
@@ -122,7 +122,9 @@ var _ = Describe("Metadata", func() {
streamSchema.Entity.TagNames =
streamSchema.Entity.TagNames[1:]
entitySize := len(streamSchema.Entity.TagNames)
-
Expect(svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(),
streamSchema)).Should(Succeed())
+ modRevision, err :=
svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(), streamSchema)
+ Expect(modRevision).ShouldNot(BeZero())
+ Expect(err).ShouldNot(HaveOccurred())
Eventually(func() bool {
val, ok :=
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 96379185..0ac80eac 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -168,6 +168,9 @@
- [TopNRequest](#banyandb-measure-v1-TopNRequest)
- [TopNResponse](#banyandb-measure-v1-TopNResponse)
+- [banyandb/model/v1/write.proto](#banyandb_model_v1_write-proto)
+ - [Status](#banyandb-model-v1-Status)
+
- [banyandb/measure/v1/write.proto](#banyandb_measure_v1_write-proto)
- [DataPointValue](#banyandb-measure-v1-DataPointValue)
- [InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest)
@@ -1671,6 +1674,11 @@ Type determine the index structure under the hood
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) | | |
+
+
@@ -1817,6 +1825,11 @@ Type determine the index structure under the hood
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) | | |
+
+
@@ -1842,6 +1855,11 @@ Type determine the index structure under the hood
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) | | |
+
+
@@ -1988,6 +2006,11 @@ Type determine the index structure under the hood
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) | | |
+
+
@@ -2498,6 +2521,38 @@ TopNResponse is the response for a query to the Query
module.
+<a name="banyandb_model_v1_write-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/model/v1/write.proto
+
+
+
+
+
+<a name="banyandb-model-v1-Status"></a>
+
+### Status
+Status is the response status for write
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| STATUS_UNSPECIFIED | 0 | |
+| STATUS_SUCCEED | 1 | |
+| STATUS_INVALID_TIMESTAMP | 2 | |
+| STATUS_NOT_FOUND | 3 | |
+| STATUS_EXPIRED_SCHEMA | 4 | |
+| STATUS_INTERNAL_ERROR | 5 | |
+
+
+
+
+
+
+
+
+
+
<a name="banyandb_measure_v1_write-proto"></a>
<p align="right"><a href="#top">Top</a></p>
@@ -2550,6 +2605,7 @@ WriteRequest is the request contract for write
| ----- | ---- | ----- | ----------- |
| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
the metadata is required. |
| data_point | [DataPointValue](#banyandb-measure-v1-DataPointValue) | | the
data_point is required. |
+| message_id | [uint64](#uint64) | | the message_id is required. |
@@ -2562,6 +2618,13 @@ WriteRequest is the request contract for write
WriteResponse is the response contract for write
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| message_id | [uint64](#uint64) | | the message_id from request. |
+| status | [banyandb.model.v1.Status](#banyandb-model-v1-Status) | | status
indicates the request processing result |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
the metadata from request when request fails |
+
+
@@ -2942,8 +3005,9 @@ QueryResponse is the response for a query to the Query
module.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
the metadata is only required in the first write. |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
the metadata is required. |
| element | [ElementValue](#banyandb-stream-v1-ElementValue) | | the element
is required. |
+| message_id | [uint64](#uint64) | | the message_id is required. |
@@ -2956,6 +3020,13 @@ QueryResponse is the response for a query to the Query
module.
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| message_id | [uint64](#uint64) | | the message_id from request. |
+| status | [banyandb.model.v1.Status](#banyandb-model-v1-Status) | | status
indicates the request processing result |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
the metadata from request when request fails |
+
+
diff --git a/pkg/partition/entity.go b/pkg/partition/entity.go
index f5a86100..22433fc6 100644
--- a/pkg/partition/entity.go
+++ b/pkg/partition/entity.go
@@ -33,7 +33,10 @@ import (
var ErrMalformedElement = errors.New("element is malformed")
// EntityLocator combines several TagLocators that help find the entity value.
-type EntityLocator []TagLocator
+type EntityLocator struct {
+ TagLocators []TagLocator
+ ModRevision int64
+}
// TagLocator contains offsets to retrieve a tag swiftly.
type TagLocator struct {
@@ -42,22 +45,22 @@ type TagLocator struct {
}
// NewEntityLocator return a EntityLocator based on tag family spec and entity
spec.
-func NewEntityLocator(families []*databasev1.TagFamilySpec, entity
*databasev1.Entity) EntityLocator {
- locator := make(EntityLocator, 0, len(entity.GetTagNames()))
+func NewEntityLocator(families []*databasev1.TagFamilySpec, entity
*databasev1.Entity, modRevision int64) EntityLocator {
+ locator := make([]TagLocator, 0, len(entity.GetTagNames()))
for _, tagInEntity := range entity.GetTagNames() {
fIndex, tIndex, tag := pbv1.FindTagByName(families, tagInEntity)
if tag != nil {
locator = append(locator, TagLocator{FamilyOffset:
fIndex, TagOffset: tIndex})
}
}
- return locator
+ return EntityLocator{TagLocators: locator, ModRevision: modRevision}
}
// Find the entity from a tag family, prepend a subject to the entity.
func (e EntityLocator) Find(subject string, value
[]*modelv1.TagFamilyForWrite) (tsdb.Entity, tsdb.EntityValues, error) {
- entityValues := make(tsdb.EntityValues, len(e)+1)
+ entityValues := make(tsdb.EntityValues, len(e.TagLocators)+1)
entityValues[0] = tsdb.StrValue(subject)
- for i, index := range e {
+ for i, index := range e.TagLocators {
tag, err := GetTagByOffset(value, index.FamilyOffset,
index.TagOffset)
if err != nil {
return nil, nil, err
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 4297c296..9d197df4 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -39,7 +39,6 @@ var (
errUnsupportedConditionValue = errors.New("unsupported condition value
type")
errInvalidCriteriaType = errors.New("invalid criteria type")
errIndexNotDefined = errors.New("index is not define for the
tag")
- errInvalidData = errors.New("data is invalid")
nullTag = &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
)
@@ -60,22 +59,30 @@ func ProjectItem(ec executor.ExecutionContext, item
tsdb.Item, projectionFieldRe
if len(refs) == 0 {
continue
}
- tags := make([]*modelv1.Tag, len(refs))
familyName := refs[0].Tag.getFamilyName()
parsedTagFamily, err := ec.ParseTagFamily(familyName, item)
if err != nil {
return nil, errors.WithMessage(err, "parse projection")
}
- if len(refs) > len(parsedTagFamily.Tags) {
- return nil, errors.Wrapf(errInvalidData,
- "the number of tags %d in %s is less then
expected %d",
- len(parsedTagFamily.Tags), familyName,
len(refs))
+
+ parsedTagSize := len(parsedTagFamily.GetTags())
+ tagRefSize := len(refs)
+
+ // Determine maximum size for creating the tags slice
+ maxSize := tagRefSize
+ if parsedTagSize < tagRefSize {
+ maxSize = parsedTagSize
}
+
+ tags := make([]*modelv1.Tag, maxSize)
+
for j, ref := range refs {
- if len(parsedTagFamily.GetTags()) > ref.Spec.TagIdx {
+ if parsedTagSize > ref.Spec.TagIdx {
tags[j] =
parsedTagFamily.GetTags()[ref.Spec.TagIdx]
- } else {
+ } else if j < parsedTagSize {
tags[j] = &modelv1.Tag{Key: ref.Tag.name,
Value: nullTag}
+ } else {
+ break
}
}
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index 18e48846..e647cf9f 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -51,7 +51,8 @@ func PreloadSchema(ctx context.Context, e schema.Registry)
error {
return errors.WithStack(err)
}
if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure
*databasev1.Measure) error {
- return e.CreateMeasure(ctx, measure)
+ _, innerErr := e.CreateMeasure(ctx, measure)
+ return innerErr
}); err != nil {
return errors.WithStack(err)
}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index e7557c31..b796ef01 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -57,7 +57,7 @@ func PreloadSchema(ctx context.Context, e schema.Registry)
error {
if unmarshalErr := protojson.Unmarshal([]byte(streamJSON), s);
unmarshalErr != nil {
return unmarshalErr
}
- if innerErr := e.CreateStream(ctx, s); innerErr != nil {
+ if _, innerErr := e.CreateStream(ctx, s); innerErr != nil {
return innerErr
}
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 04316efe..8405152f 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -35,6 +35,7 @@ import (
"sigs.k8s.io/yaml"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
@@ -105,7 +106,7 @@ func loadData(md *commonv1.Metadata, measure
measurev1.MeasureService_WriteClien
dataPointValue := &measurev1.DataPointValue{}
gm.Expect(protojson.Unmarshal(rawDataPointValue,
dataPointValue)).ShouldNot(gm.HaveOccurred())
dataPointValue.Timestamp =
timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval))
- gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md,
DataPoint: dataPointValue})).
+ gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md,
DataPoint: dataPointValue, MessageId: uint64(time.Now().UnixNano())})).
Should(gm.Succeed())
}
}
@@ -114,14 +115,21 @@ func loadData(md *commonv1.Metadata, measure
measurev1.MeasureService_WriteClien
func Write(conn *grpclib.ClientConn, name, group, dataFile string,
baseTime time.Time, interval time.Duration,
) {
+ metadata := &commonv1.Metadata{
+ Name: name,
+ Group: group,
+ }
+
+ schema := databasev1.NewMeasureRegistryServiceClient(conn)
+ resp, err := schema.Get(context.Background(),
&databasev1.MeasureRegistryServiceGetRequest{Metadata: metadata})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ metadata = resp.GetMeasure().GetMetadata()
+
c := measurev1.NewMeasureServiceClient(conn)
ctx := context.Background()
writeClient, err := c.Write(ctx)
gm.Expect(err).NotTo(gm.HaveOccurred())
- loadData(&commonv1.Metadata{
- Name: name,
- Group: group,
- }, writeClient, dataFile, baseTime, interval)
+ loadData(metadata, writeClient, dataFile, baseTime, interval)
gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
gm.Eventually(func() error {
_, err := writeClient.Recv()
diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go
index cb6cbdee..4e48d18c 100644
--- a/test/cases/stream/data/data.go
+++ b/test/cases/stream/data/data.go
@@ -37,6 +37,7 @@ import (
"sigs.k8s.io/yaml"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
@@ -97,12 +98,13 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext
helpers.SharedContext, args
})
}
-func loadData(stream streamv1.StreamService_WriteClient, dataFile string,
baseTime time.Time, interval time.Duration) {
+func loadData(stream streamv1.StreamService_WriteClient, metadata
*commonv1.Metadata, dataFile string, baseTime time.Time, interval
time.Duration) {
var templates []interface{}
content, err := dataFS.ReadFile("testdata/" + dataFile)
gm.Expect(err).ShouldNot(gm.HaveOccurred())
gm.Expect(json.Unmarshal(content,
&templates)).ShouldNot(gm.HaveOccurred())
bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+
for i, template := range templates {
rawSearchTagFamily, errMarshal := json.Marshal(template)
gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
@@ -125,11 +127,9 @@ func loadData(stream streamv1.StreamService_WriteClient,
dataFile string, baseTi
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
errInner := stream.Send(&streamv1.WriteRequest{
- Metadata: &commonv1.Metadata{
- Name: "sw",
- Group: "default",
- },
- Element: e,
+ Metadata: metadata,
+ Element: e,
+ MessageId: uint64(time.Now().UnixNano()),
})
gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
}
@@ -137,11 +137,20 @@ func loadData(stream streamv1.StreamService_WriteClient,
dataFile string, baseTi
// Write data into the server.
func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time,
interval time.Duration) {
+ metadata := &commonv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+ schema := databasev1.NewStreamRegistryServiceClient(conn)
+ resp, err := schema.Get(context.Background(),
&databasev1.StreamRegistryServiceGetRequest{Metadata: metadata})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ metadata = resp.GetStream().GetMetadata()
+
c := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
writeClient, err := c.Write(ctx)
gm.Expect(err).NotTo(gm.HaveOccurred())
- loadData(writeClient, dataFile, baseTime, interval)
+ loadData(writeClient, metadata, dataFile, baseTime, interval)
gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
gm.Eventually(func() error {
_, err := writeClient.Recv()
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index adcd5c08..98f0a428 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -33,7 +33,7 @@ services:
- sw_agent:/skywalking-java-agent
oap:
- image: "ghcr.io/apache/skywalking/oap:${SW_OAP_COMMIT}"
+ image:
"ghcr.io/apache/skywalking-banyandb-java-client/oap:${SW_OAP_COMMIT}"
expose:
- 11800
- 12800
@@ -55,7 +55,7 @@ services:
retries: 120
ui:
- image: "ghcr.io/apache/skywalking/ui:${SW_OAP_COMMIT}"
+ image: "ghcr.io/apache/skywalking-banyandb-java-client/ui:${SW_OAP_COMMIT}"
expose:
- 8080
environment:
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 57dc048e..c1d3d0a0 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -25,5 +25,5 @@
SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5
SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
SW_CTL_COMMIT=6b2eb0011e38b630db6af7203db215806bd141ed
-SW_OAP_COMMIT=5ad74eb2619b4cebe57a150ca328b2c1d052cb54
+SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e
SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9
diff --git a/test/stress/cases/istio/istio_suite_test.go
b/test/stress/cases/istio/istio_suite_test.go
index 88adad25..a2a2a5b6 100644
--- a/test/stress/cases/istio/istio_suite_test.go
+++ b/test/stress/cases/istio/istio_suite_test.go
@@ -150,6 +150,7 @@ func ReadAndWriteFromFile(filePath string, conn
*grpc.ClientConn) error {
return fmt.Errorf("failed to unmarshal JSON
message: %w", errUnmarshal)
}
+ req.MessageId = uint64(time.Now().UnixNano())
req.DataPoint.Timestamp =
timestamppb.New(adjustTime(req.DataPoint.Timestamp.AsTime()))
// Write the request to the measureService
if errSend := client.Send(&req); errSend != nil {
diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go
index 6f5fe011..6865ba60 100644
--- a/test/stress/cases/istio/repo.go
+++ b/test/stress/cases/istio/repo.go
@@ -147,7 +147,8 @@ func (p *preloadService) PreRun(_ context.Context) error {
return errors.WithStack(err)
}
if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure
*databasev1.Measure) error {
- return e.CreateMeasure(context.TODO(), measure)
+ _, innerErr := e.CreateMeasure(context.TODO(), measure)
+ return innerErr
}); err != nil {
return errors.WithStack(err)
}
diff --git a/test/stress/env b/test/stress/env
index 1ff35800..91076f83 100644
--- a/test/stress/env
+++ b/test/stress/env
@@ -26,7 +26,7 @@
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
+SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e
SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
TARGET=test
diff --git a/test/stress/env.dev b/test/stress/env.dev
index 604ded57..febe0637 100644
--- a/test/stress/env.dev
+++ b/test/stress/env.dev
@@ -25,7 +25,7 @@
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
+SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e
SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
TARGET=dev
diff --git a/ui/src/components/Editor/index.vue
b/ui/src/components/Editor/index.vue
index 5c4851a3..4f5a4daa 100644
--- a/ui/src/components/Editor/index.vue
+++ b/ui/src/components/Editor/index.vue
@@ -55,6 +55,7 @@ const data = reactive({
form: {
group: route.params.group,
name: route.params.group,
+ modRevision: route.params.modRevision,
interval: 1,
intervalUnit: 'ns'
}
@@ -98,6 +99,7 @@ const options = [
watch(() => route, () => {
data.form.group = route.params.group
data.form.name = route.params.name
+ data.form.modRevision = route.params.modRevision
data.type = route.params.type + ''
data.operator = route.params.operator
initData()
@@ -149,7 +151,8 @@ const submit = async (formEl: FormInstance | undefined) => {
const form = {
metadata: {
group: data.form.group,
- name: data.form.name
+ name: data.form.name,
+ modRevision: data.form.modRevision
},
tagFamilies: tagFamilies,
entity: {
@@ -262,6 +265,7 @@ function initData() {
data.form.intervalUnit = intervalUnit
fieldEditorRef.value.setFields(fields)
}
+ data.form.modRevision = res.data[data.type +
''].metadata.modRevision
}
})
.finally(() => {