This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4af513a9 Apply integration query test cases to a cluster (#333)
4af513a9 is described below
commit 4af513a9d8ac5895b714787c29f412b7d614c1db
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Sep 19 20:33:07 2023 +0800
Apply integration query test cases to a cluster (#333)
* Use CMD to setup testing service
* Refactor node selecting process
* Apply query test cases to a cluster
---------
Signed-off-by: Gao Hongtao <[email protected]>
---
api/data/data.go | 68 ++++++++
api/data/doc.go | 19 --
banyand/dquery/dquery.go | 14 +-
banyand/dquery/measure.go | 2 +-
banyand/dquery/stream.go | 2 +-
banyand/dquery/topn.go | 53 +++---
banyand/liaison/grpc/discovery.go | 106 ++----------
banyand/liaison/grpc/measure.go | 28 ++-
banyand/liaison/grpc/node.go | 109 ++++++++++++
banyand/liaison/grpc/registry_test.go | 2 +-
banyand/liaison/grpc/server.go | 18 +-
banyand/liaison/grpc/stream.go | 20 ++-
banyand/measure/measure.go | 5 +-
banyand/measure/metadata.go | 13 ++
banyand/measure/tstable.go | 5 +-
banyand/metadata/allocator.go | 78 ---------
banyand/metadata/client.go | 23 +--
banyand/metadata/metadata.go | 1 -
banyand/metadata/schema/checker.go | 7 -
banyand/metadata/schema/kind.go | 9 +-
banyand/metadata/schema/schema.go | 12 --
banyand/metadata/schema/shard.go | 74 --------
banyand/query/processor.go | 4 +-
banyand/query/processor_topn.go | 25 +--
banyand/queue/local.go | 4 +
banyand/queue/pub/client.go | 6 +
banyand/queue/pub/pub.go | 45 +++--
banyand/queue/queue.go | 2 +
banyand/queue/sub/sub.go | 39 ++++-
banyand/stream/metadata.go | 12 ++
banyand/stream/stream.go | 3 +
banyand/stream/tstable.go | 6 +-
banyand/tsdb/buffer.go | 2 +-
bydbctl/internal/cmd/group_test.go | 6 +-
bydbctl/internal/cmd/index_rule_binding_test.go | 3 +-
bydbctl/internal/cmd/index_rule_test.go | 3 +-
bydbctl/internal/cmd/measure_test.go | 6 +-
bydbctl/internal/cmd/property_test.go | 3 +-
bydbctl/internal/cmd/stream_test.go | 6 +-
pkg/bus/bus.go | 5 -
pkg/cmdsetup/data.go | 8 -
pkg/cmdsetup/liaison.go | 7 +-
pkg/cmdsetup/standalone.go | 2 +-
.../logical/measure/measure_plan_distributed.go | 84 ++++++---
.../logical/stream/stream_plan_distributed.go | 54 +++++-
pkg/run/run.go | 3 +
pkg/schema/metadata.go | 52 +++---
pkg/schema/schema.go | 2 +-
pkg/test/setup/setup.go | 192 ++++++++++++++-------
.../query/query_suite_test.go => cases/init.go} | 76 +-------
test/cases/measure/data/input/order_tag_asc.yaml | 2 -
test/cases/measure/data/input/order_tag_desc.yaml | 2 -
test/cases/measure/data/want/order_tag_asc.yaml | 78 ++-------
test/cases/measure/data/want/order_tag_desc.yaml | 78 ++-------
.../query/query_suite_test.go | 84 +++++----
test/integration/load/load_suite_test.go | 2 +-
.../standalone/cold_query/query_suite_test.go | 25 +--
test/integration/standalone/other/measure_test.go | 4 +-
test/integration/standalone/other/property_test.go | 7 +-
test/integration/standalone/other/tls_test.go | 3 +-
.../standalone/query/query_suite_test.go | 26 +--
test/stress/cases/istio/istio_suite_test.go | 2 +-
test/stress/cases/istio/repo.go | 12 +-
63 files changed, 809 insertions(+), 844 deletions(-)
diff --git a/api/data/data.go b/api/data/data.go
new file mode 100644
index 00000000..5da103ef
--- /dev/null
+++ b/api/data/data.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package data contains data transmission topics.
+package data
+
+import (
+ "google.golang.org/protobuf/proto"
+
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+// TopicMap is the map of topic name to topic.
+var TopicMap = map[string]bus.Topic{
+ TopicStreamWrite.String(): TopicStreamWrite,
+ TopicStreamQuery.String(): TopicStreamQuery,
+ TopicMeasureWrite.String(): TopicMeasureWrite,
+ TopicMeasureQuery.String(): TopicMeasureQuery,
+ TopicTopNQuery.String(): TopicTopNQuery,
+}
+
+// TopicRequestMap is the map of topic name to request message.
+var TopicRequestMap = map[bus.Topic]func() proto.Message{
+ TopicStreamWrite: func() proto.Message {
+ return &streamv1.InternalWriteRequest{}
+ },
+ TopicStreamQuery: func() proto.Message {
+ return &streamv1.QueryRequest{}
+ },
+ TopicMeasureWrite: func() proto.Message {
+ return &measurev1.InternalWriteRequest{}
+ },
+ TopicMeasureQuery: func() proto.Message {
+ return &measurev1.QueryRequest{}
+ },
+ TopicTopNQuery: func() proto.Message {
+ return &measurev1.TopNRequest{}
+ },
+}
+
+// TopicResponseMap is the map of topic name to response message.
+var TopicResponseMap = map[bus.Topic]func() proto.Message{
+ TopicStreamQuery: func() proto.Message {
+ return &streamv1.QueryResponse{}
+ },
+ TopicMeasureQuery: func() proto.Message {
+ return &measurev1.QueryResponse{}
+ },
+ TopicTopNQuery: func() proto.Message {
+ return &measurev1.TopNResponse{}
+ },
+}
diff --git a/api/data/doc.go b/api/data/doc.go
deleted file mode 100644
index 25918677..00000000
--- a/api/data/doc.go
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package data contains data transmission topics.
-package data
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index 4d77b6fb..c7e1454d 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -21,9 +21,13 @@ package dquery
import (
"context"
+ "go.uber.org/multierr"
+
+ "github.com/apache/skywalking-banyandb/api/data"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -44,14 +48,16 @@ type queryService struct {
mqp *measureQueryProcessor
tqp *topNQueryProcessor
closer *run.Closer
+ pipeline queue.Server
}
// NewService return a new query service.
-func NewService(metaService metadata.Repo, broadcaster bus.Broadcaster,
+func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster
bus.Broadcaster,
) (run.Unit, error) {
svc := &queryService{
metaService: metaService,
closer: run.NewCloser(1),
+ pipeline: pipeline,
}
svc.sqp = &streamQueryProcessor{
queryService: svc,
@@ -76,7 +82,11 @@ func (q *queryService) PreRun(_ context.Context) error {
q.log = logger.GetLogger(moduleName)
q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
q.mqp.measureService = measure.NewPortableRepository(q.metaService,
q.log)
- return nil
+ return multierr.Combine(
+ q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
+ q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
+ q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp),
+ )
}
func (q *queryService) GracefulStop() {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index b145de94..9dc95d82 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -95,6 +95,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.RawJSON("ret",
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
}
- resp = bus.NewMessage(bus.MessageID(now), result)
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{DataPoints: result})
return
}
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 62804d00..6fd129d0 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -78,7 +78,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
return
}
- resp = bus.NewMessage(bus.MessageID(now), entities)
+ resp = bus.NewMessage(bus.MessageID(now),
&streamv1.QueryResponse{Elements: entities})
return
}
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index df34d337..a885f829 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -19,12 +19,13 @@ package dquery
import (
"go.uber.org/multierr"
- "google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/query"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
@@ -48,6 +49,8 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
if e := t.log.Debug(); e.Enabled() {
e.Stringer("req", request).Msg("received a topN query event")
}
+ agg := request.Agg
+ request.Agg =
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
now := bus.MessageID(request.TimeRange.Begin.Nanos)
ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery,
bus.NewMessage(now, request))
if err != nil {
@@ -55,35 +58,41 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
return
}
var allErr error
- var sii []sort.Iterator[*comparableTopNItem]
- var latestTimestamp *timestamppb.Timestamp
+ aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
+ agg, request.GetFieldValueSort())
+ var tags []string
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
} else {
- tl := m.Data().(*measurev1.TopNList)
- un := tl.Timestamp.AsTime().UnixNano()
- if un > latestTimestamp.AsTime().UnixNano() {
- latestTimestamp = tl.Timestamp
+ d := m.Data()
+ if d == nil {
+ continue
+ }
+ topNResp := d.(*measurev1.TopNResponse)
+ for _, l := range topNResp.Lists {
+ for _, tn := range l.Items {
+ if tags == nil {
+ tags = make([]string, 0,
len(tn.Entity))
+ for _, e := range tn.Entity {
+ tags = append(tags,
e.Key)
+ }
+ }
+ entityValues := make(tsdb.EntityValues,
0, len(tn.Entity))
+ for _, e := range tn.Entity {
+ entityValues =
append(entityValues, e.Value)
+ }
+ _ = aggregator.Put(entityValues,
tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli()))
+ }
}
- sii = append(sii, &sortedTopNList{TopNList: tl})
}
}
- var desc bool
- if request.GetFieldValueSort() == modelv1.Sort_SORT_DESC {
- desc = true
- }
- iter := sort.NewItemIter[*comparableTopNItem](sii, desc)
- defer func() {
- _ = iter.Close()
- }()
- var items []*measurev1.TopNList_Item
- for iter.Next() && len(items) < int(request.TopN) {
- items = append(items, iter.Val().TopNList_Item)
+ if tags == nil {
+ resp = bus.NewMessage(now, &measurev1.TopNResponse{})
+ return
}
- resp = bus.NewMessage(now, &measurev1.TopNList{
- Items: items,
- Timestamp: latestTimestamp,
+ resp = bus.NewMessage(now, &measurev1.TopNResponse{
+ Lists: aggregator.Val(tags),
})
return
}
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index 823183b1..57914fec 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -18,10 +18,8 @@
package grpc
import (
- "context"
"fmt"
"sync"
- "time"
"github.com/pkg/errors"
@@ -31,7 +29,6 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
- "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
@@ -40,97 +37,28 @@ import (
var errNotExist = errors.New("the object doesn't exist")
type discoveryService struct {
- pipeline queue.Client
metadataRepo metadata.Repo
+ nodeRegistry NodeRegistry
shardRepo *shardRepo
entityRepo *entityRepo
log *logger.Logger
kind schema.Kind
}
-func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo
metadata.Repo) *discoveryService {
+func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo,
nodeRegistry NodeRegistry) *discoveryService {
sr := &shardRepo{shardEventsMap: make(map[identity]uint32)}
er := &entityRepo{entitiesMap:
make(map[identity]partition.EntityLocator)}
return &discoveryService{
shardRepo: sr,
entityRepo: er,
- pipeline: pipeline,
kind: kind,
metadataRepo: metadataRepo,
+ nodeRegistry: nodeRegistry,
}
}
-func (ds *discoveryService) initialize(ctx context.Context) error {
- ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second)
- groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctxLocal)
- cancel()
- if err != nil {
- return err
- }
- for _, g := range groups {
- switch ds.kind {
- case schema.KindMeasure:
- case schema.KindStream:
- default:
- continue
- }
- ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second)
- shards, innerErr :=
ds.metadataRepo.ShardRegistry().ListShard(ctxLocal, schema.ListOpt{Group:
g.Metadata.Name})
- cancel()
- if innerErr != nil {
- return innerErr
- }
- for _, s := range shards {
- ds.shardRepo.OnAddOrUpdate(schema.Metadata{
- TypeMeta: schema.TypeMeta{
- Kind: schema.KindShard,
- Name: s.Metadata.Name,
- Group: s.Metadata.Group,
- },
- Spec: s,
- })
- }
-
- switch ds.kind {
- case schema.KindMeasure:
- ctxLocal, cancel = context.WithTimeout(ctx,
5*time.Second)
- mm, innerErr :=
ds.metadataRepo.MeasureRegistry().ListMeasure(ctxLocal, schema.ListOpt{Group:
g.Metadata.Name})
- cancel()
- if innerErr != nil {
- return innerErr
- }
- for _, m := range mm {
- ds.entityRepo.OnAddOrUpdate(schema.Metadata{
- TypeMeta: schema.TypeMeta{
- Kind: schema.KindMeasure,
- Name: m.Metadata.Name,
- Group: m.Metadata.Group,
- },
- Spec: m,
- })
- }
- case schema.KindStream:
- ctxLocal, cancel = context.WithTimeout(ctx,
5*time.Second)
- ss, innerErr :=
ds.metadataRepo.StreamRegistry().ListStream(ctxLocal, schema.ListOpt{Group:
g.Metadata.Name})
- cancel()
- if innerErr != nil {
- return innerErr
- }
- for _, s := range ss {
- ds.entityRepo.OnAddOrUpdate(schema.Metadata{
- TypeMeta: schema.TypeMeta{
- Kind: schema.KindStream,
- Name: s.Metadata.Name,
- Group: s.Metadata.Group,
- },
- Spec: s,
- })
- }
- default:
- return fmt.Errorf("unsupported kind: %d", ds.kind)
- }
- }
- ds.metadataRepo.RegisterHandler("liaison", schema.KindShard,
ds.shardRepo)
+func (ds *discoveryService) initialize() error {
+ ds.metadataRepo.RegisterHandler("liaison", schema.KindGroup,
ds.shardRepo)
ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo)
return nil
}
@@ -172,28 +100,32 @@ type shardRepo struct {
sync.RWMutex
}
-// OnAddOrUpdate implements schema.EventHandler.
func (s *shardRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
- if schemaMetadata.Kind != schema.KindShard {
+ if schemaMetadata.Kind != schema.KindGroup {
return
}
- shard := schemaMetadata.Spec.(*databasev1.Shard)
- idx := getID(shard.GetMetadata())
+ group := schemaMetadata.Spec.(*commonv1.Group)
+ if group.ResourceOpts == nil || group.Catalog ==
commonv1.Catalog_CATALOG_UNSPECIFIED {
+ return
+ }
+ idx := getID(group.GetMetadata())
if le := s.log.Debug(); le.Enabled() {
- le.Stringer("id", idx).Uint32("total", shard.Total).Msg("shard
added or updated")
+ le.Stringer("id", idx).Uint32("total",
group.ResourceOpts.ShardNum).Msg("shard added or updated")
}
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
- s.shardEventsMap[idx] = shard.Total
+ s.shardEventsMap[idx] = group.ResourceOpts.ShardNum
}
-// OnDelete implements schema.EventHandler.
func (s *shardRepo) OnDelete(schemaMetadata schema.Metadata) {
- if schemaMetadata.Kind != schema.KindShard {
+ if schemaMetadata.Kind != schema.KindGroup {
+ return
+ }
+ group := schemaMetadata.Spec.(*commonv1.Group)
+ if group.ResourceOpts == nil || group.Catalog ==
commonv1.Catalog_CATALOG_UNSPECIFIED {
return
}
- shard := schemaMetadata.Spec.(*databasev1.Shard)
- idx := getID(shard.GetMetadata())
+ idx := getID(group.GetMetadata())
if le := s.log.Debug(); le.Enabled() {
le.Stringer("id", idx).Msg("shard deleted")
}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 90cf7632..2a32d29d 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -31,6 +31,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -43,6 +44,8 @@ type measureService struct {
*discoveryService
sampled *logger.Logger
ingestionAccessLog accesslog.Log
+ pipeline queue.Client
+ broadcaster queue.Client
}
func (ms *measureService) setLogger(log *logger.Logger) {
@@ -113,11 +116,18 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
SeriesHash: tsdb.HashEntity(entity),
EntityValues: tagValues.Encode(),
}
- // TODO: set node id
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "todo", iwr)
+ nodeID, errPickNode :=
ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(),
writeRequest.GetMetadata().GetName(), uint32(shardID))
+ if errPickNode != nil {
+ ms.sampled.Error().Err(errPickNode).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to pick an available node")
+ reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure,
ms.sampled)
+ continue
+ }
+ message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(data.TopicMeasureWrite,
message)
if errWritePub != nil {
- ms.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to send a message")
+ ms.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a
message")
+ reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure,
ms.sampled)
+ continue
}
reply(nil, modelv1.Status_STATUS_SUCCEED,
writeRequest.GetMessageId(), measure, ms.sampled)
}
@@ -130,7 +140,7 @@ func (ms *measureService) Query(_ context.Context, req
*measurev1.QueryRequest)
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
- feat, errQuery := ms.pipeline.Publish(data.TopicMeasureQuery, message)
+ feat, errQuery := ms.broadcaster.Publish(data.TopicMeasureQuery,
message)
if errQuery != nil {
return nil, errQuery
}
@@ -143,8 +153,8 @@ func (ms *measureService) Query(_ context.Context, req
*measurev1.QueryRequest)
}
data := msg.Data()
switch d := data.(type) {
- case []*measurev1.DataPoint:
- return &measurev1.QueryResponse{DataPoints: d}, nil
+ case *measurev1.QueryResponse:
+ return d, nil
case common.Error:
return nil, errors.WithMessage(errQueryMsg, d.Msg())
}
@@ -157,7 +167,7 @@ func (ms *measureService) TopN(_ context.Context,
topNRequest *measurev1.TopNReq
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
topNRequest)
- feat, errQuery := ms.pipeline.Publish(data.TopicTopNQuery, message)
+ feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message)
if errQuery != nil {
return nil, errQuery
}
@@ -167,8 +177,8 @@ func (ms *measureService) TopN(_ context.Context,
topNRequest *measurev1.TopNReq
}
data := msg.Data()
switch d := data.(type) {
- case []*measurev1.TopNList:
- return &measurev1.TopNResponse{Lists: d}, nil
+ case *measurev1.TopNResponse:
+ return d, nil
case common.Error:
return nil, errors.WithMessage(errQueryMsg, d.Msg())
}
diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go
new file mode 100644
index 00000000..73529604
--- /dev/null
+++ b/banyand/liaison/grpc/node.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "sort"
+ "sync"
+
+ "github.com/pkg/errors"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+var (
+ _ schema.EventHandler = (*clusterNodeService)(nil)
+ _ NodeRegistry = (*clusterNodeService)(nil)
+)
+
+// NodeRegistry is for locating data node with group/name of the metadata
+// together with the shardID calculated from the incoming data.
+type NodeRegistry interface {
+ Locate(group, name string, shardID uint32) (string, error)
+}
+
+type clusterNodeService struct {
+ metaRepo queue.Client
+ nodes []string
+ nodeMutex sync.RWMutex
+ sync.Once
+}
+
+// NewClusterNodeRegistry creates a cluster node registry.
+func NewClusterNodeRegistry(metaRepo queue.Client) NodeRegistry {
+ nr := &clusterNodeService{
+ metaRepo: metaRepo,
+ }
+ metaRepo.Register(nr)
+ return nr
+}
+
+func (n *clusterNodeService) Locate(_, _ string, shardID uint32) (string,
error) {
+ // Use round-robin to select the node.
+ n.nodeMutex.RLock()
+ defer n.nodeMutex.RUnlock()
+ if len(n.nodes) == 0 {
+ return "", errors.New("no node available")
+ }
+ return n.nodes[shardID%uint32(len(n.nodes))], nil
+}
+
+func (n *clusterNodeService) OnAddOrUpdate(metadata schema.Metadata) {
+ switch metadata.Kind {
+ case schema.KindNode:
+ n.nodeMutex.Lock()
+ defer n.nodeMutex.Unlock()
+ for _, node := range n.nodes {
+ if node ==
metadata.Spec.(*databasev1.Node).Metadata.Name {
+ return
+ }
+ }
+ n.nodes = append(n.nodes,
metadata.Spec.(*databasev1.Node).Metadata.Name)
+ sort.Strings(n.nodes)
+ default:
+ }
+}
+
+func (n *clusterNodeService) OnDelete(metadata schema.Metadata) {
+ switch metadata.Kind {
+ case schema.KindNode:
+ n.nodeMutex.Lock()
+ defer n.nodeMutex.Unlock()
+ for i, node := range n.nodes {
+ if node ==
metadata.Spec.(*databasev1.Node).Metadata.Name {
+ n.nodes = append(n.nodes[:i], n.nodes[i+1:]...)
+ break
+ }
+ }
+ default:
+ }
+}
+
+type localNodeService struct{}
+
+// NewLocalNodeRegistry creates a local(fake) node registry.
+func NewLocalNodeRegistry() NodeRegistry {
+ return localNodeService{}
+}
+
+// Locate of localNodeService always returns local.
+func (localNodeService) Locate(_, _ string, _ uint32) (string, error) {
+ return "local", nil
+}
diff --git a/banyand/liaison/grpc/registry_test.go
b/banyand/liaison/grpc/registry_test.go
index 6c86d780..d7118b40 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -178,7 +178,7 @@ func setupForRegistry() func() {
metaSvc, err := metadata.NewService(context.TODO())
Expect(err).NotTo(HaveOccurred())
- tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
+ tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry())
preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
var flags []string
metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index cc7e0817..102bd170 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -64,8 +64,7 @@ type Server interface {
}
type server struct {
- pipeline queue.Client
- creds credentials.TransportCredentials
+ creds credentials.TransportCredentials
*streamRegistryServer
log *logger.Logger
*indexRuleBindingRegistryServer
@@ -91,15 +90,18 @@ type server struct {
}
// NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry
metadata.Repo) Server {
+func NewServer(_ context.Context, pipeline, broadcaster queue.Client,
schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server {
streamSVC := &streamService{
- discoveryService: newDiscoveryService(pipeline,
schema.KindStream, schemaRegistry),
+ discoveryService: newDiscoveryService(schema.KindStream,
schemaRegistry, nodeRegistry),
+ pipeline: pipeline,
+ broadcaster: broadcaster,
}
measureSVC := &measureService{
- discoveryService: newDiscoveryService(pipeline,
schema.KindMeasure, schemaRegistry),
+ discoveryService: newDiscoveryService(schema.KindMeasure,
schemaRegistry, nodeRegistry),
+ pipeline: pipeline,
+ broadcaster: broadcaster,
}
s := &server{
- pipeline: pipeline,
streamSVC: streamSVC,
measureSVC: measureSVC,
streamRegistryServer: &streamRegistryServer{
@@ -128,7 +130,7 @@ func NewServer(_ context.Context, pipeline queue.Client,
schemaRegistry metadata
return s
}
-func (s *server) PreRun(ctx context.Context) error {
+func (s *server) PreRun(_ context.Context) error {
s.log = logger.GetLogger("liaison-grpc")
s.streamSVC.setLogger(s.log)
s.measureSVC.setLogger(s.log)
@@ -138,7 +140,7 @@ func (s *server) PreRun(ctx context.Context) error {
}
for _, c := range components {
c.SetLogger(s.log)
- if err := c.initialize(ctx); err != nil {
+ if err := c.initialize(); err != nil {
return err
}
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 0fbee9a4..aa3d49a4 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -31,6 +31,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -43,6 +44,8 @@ type streamService struct {
*discoveryService
sampled *logger.Logger
ingestionAccessLog accesslog.Log
+ pipeline queue.Client
+ broadcaster queue.Client
}
func (s *streamService) setLogger(log *logger.Logger) {
@@ -115,10 +118,17 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
if s.log.Debug().Enabled() {
iwr.EntityValues = tagValues.Encode()
}
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "TODO", iwr)
+ nodeID, errPickNode :=
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(),
writeEntity.GetMetadata().GetName(), uint32(shardID))
+ if errPickNode != nil {
+ s.sampled.Error().Err(errPickNode).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to pick an available node")
+ reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream,
s.sampled)
+ continue
+ }
+ message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(data.TopicStreamWrite,
message)
if errWritePub != nil {
- s.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to send a message")
+ s.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
+ reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream,
s.sampled)
}
reply(nil, modelv1.Status_STATUS_SUCCEED,
writeEntity.GetMessageId(), stream, s.sampled)
}
@@ -135,7 +145,7 @@ func (s *streamService) Query(_ context.Context, req
*streamv1.QueryRequest) (*s
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
- feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
+ feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message)
if errQuery != nil {
if errors.Is(errQuery, io.EOF) {
return emptyStreamQueryResponse, nil
@@ -148,8 +158,8 @@ func (s *streamService) Query(_ context.Context, req
*streamv1.QueryRequest) (*s
}
data := msg.Data()
switch d := data.(type) {
- case []*streamv1.Element:
- return &streamv1.QueryResponse{Elements: d}, nil
+ case *streamv1.QueryResponse:
+ return d, nil
case common.Error:
return nil, errors.WithMessage(errQueryMsg, d.Msg())
}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index b8a55140..e5f28956 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -114,9 +114,12 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec
measureSpec, l *logger.
if err := m.parseSpec(); err != nil {
return nil, err
}
- ctx := context.WithValue(context.Background(), logger.ContextKey, l)
+ if db == nil {
+ return m, nil
+ }
m.databaseSupplier = db
+ ctx := context.WithValue(context.Background(), logger.ContextKey, l)
m.indexWriter = index.NewWriter(ctx, index.WriterOptions{
DB: db,
ShardNum: shardNum,
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 053abee3..27dfa090 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -405,3 +405,16 @@ func (s *portableSupplier) ResourceSchema(md
*commonv1.Metadata) (resourceSchema
defer cancel()
return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
}
+
+func (*portableSupplier) OpenDB(_ *commonv1.Group) (tsdb.Database, error) {
+ panic("do not support open db")
+}
+
+func (s *portableSupplier) OpenResource(shardNum uint32, _ tsdb.Supplier, spec
resourceSchema.Resource) (io.Closer, error) {
+ measureSchema := spec.Schema().(*databasev1.Measure)
+ return openMeasure(shardNum, nil, measureSpec{
+ schema: measureSchema,
+ indexRules: spec.IndexRules(),
+ topNAggregations: spec.TopN(),
+ }, s.l, nil)
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 462d5066..a52b2a78 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -60,7 +60,7 @@ type tsTable struct {
path string
bufferSize int64
encoderBufferSize int64
- lock sync.Mutex
+ lock sync.RWMutex
}
func (t *tsTable) SizeOnDisk() int64 {
@@ -154,9 +154,12 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte,
error) {
}
func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
+ t.lock.RLock()
if t.encoderBuffer != nil {
+ defer t.lock.RUnlock()
return t.writeToBuffer(key, val, ts)
}
+ t.lock.RUnlock()
if err := t.openBuffer(); err != nil {
return err
}
diff --git a/banyand/metadata/allocator.go b/banyand/metadata/allocator.go
deleted file mode 100644
index b85470d3..00000000
--- a/banyand/metadata/allocator.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package metadata
-
-import (
- "context"
- "time"
-
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
- "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-var _ schema.EventHandler = (*allocator)(nil)
-
-type allocator struct {
- schemaRegistry schema.Registry
- l *logger.Logger
-}
-
-func newAllocator(schemaRegistry schema.Registry, logger *logger.Logger)
*allocator {
- return &allocator{
- schemaRegistry: schemaRegistry,
- l: logger,
- }
-}
-
-// OnAddOrUpdate implements EventHandler.
-func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) {
- switch metadata.Kind {
- case schema.KindGroup:
- groupSchema := metadata.Spec.(*commonv1.Group)
- if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED {
- return
- }
- shardNum := groupSchema.GetResourceOpts().GetShardNum()
- syncShard := func(id uint64) error {
- ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- return a.schemaRegistry.CreateOrUpdateShard(ctx,
&databasev1.Shard{
- Id: id,
- Total: shardNum,
- Metadata: &commonv1.Metadata{
- Name:
groupSchema.GetMetadata().GetName(),
- },
- Node: "TODO",
- })
- }
- for i := 0; i < int(shardNum); i++ {
- _ = syncShard(uint64(i))
- }
- case schema.KindNode:
- // TODO: handle node
- default:
- return
- }
-}
-
-// OnDelete implements EventHandler.
-func (*allocator) OnDelete(schema.Metadata) {
- // TODO: handle delete
-}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index d1271f20..9a166d38 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -29,11 +29,14 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
- "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
-const flagEtcdEndpointsName = "etcd-endpoints"
+const (
+ // DefaultNamespace is the default namespace of the metadata stored in
etcd.
+ DefaultNamespace = "banyandb"
+ flagEtcdEndpointsName = "etcd-endpoints"
+)
// NewClient returns a new metadata client.
func NewClient(_ context.Context) (Service, error) {
@@ -43,7 +46,6 @@ func NewClient(_ context.Context) (Service, error) {
type clientService struct {
namespace string
schemaRegistry schema.Registry
- alc *allocator
closer *run.Closer
endpoints []string
}
@@ -54,7 +56,7 @@ func (s *clientService) SchemaRegistry() schema.Registry {
func (s *clientService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
- fs.StringVar(&s.namespace, "namespace", "banyandb", "The namespace of
the metadata stored in etcd")
+ fs.StringVar(&s.namespace, "namespace", DefaultNamespace, "The
namespace of the metadata stored in etcd")
fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName,
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
return fs
}
@@ -87,7 +89,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
nodeRoles := val.([]databasev1.Role)
ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
- if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{
+ return s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{
Metadata: &commonv1.Metadata{
Name: node.NodeID,
},
@@ -95,12 +97,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
HttpAddress: node.HTTPAddress,
Roles: nodeRoles,
CreatedAt: timestamppb.Now(),
- }); err != nil {
- return err
- }
- s.alc = newAllocator(s.schemaRegistry,
logger.GetLogger(s.Name()).Named("allocator"))
- s.schemaRegistry.RegisterHandler("shard-allocator",
schema.KindGroup|schema.KindNode, s.alc)
- return nil
+ })
}
func (s *clientService) Serve() run.StopNotify {
@@ -145,10 +142,6 @@ func (s *clientService) PropertyRegistry() schema.Property
{
return s.schemaRegistry
}
-func (s *clientService) ShardRegistry() schema.Shard {
- return s.schemaRegistry
-}
-
func (s *clientService) Name() string {
return "metadata"
}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 58d10318..a5c76de5 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -46,7 +46,6 @@ type Repo interface {
GroupRegistry() schema.Group
TopNAggregationRegistry() schema.TopNAggregation
PropertyRegistry() schema.Property
- ShardRegistry() schema.Shard
RegisterHandler(string, schema.Kind, schema.EventHandler)
}
diff --git a/banyand/metadata/schema/checker.go
b/banyand/metadata/schema/checker.go
index cdedc54b..4a336dc4 100644
--- a/banyand/metadata/schema/checker.go
+++ b/banyand/metadata/schema/checker.go
@@ -88,13 +88,6 @@ var checkerMap = map[Kind]equalityChecker{
protocmp.IgnoreFields(&databasev1.Node{}, "created_at"),
protocmp.Transform())
},
- KindShard: func(a, b proto.Message) bool {
- return cmp.Equal(a, b,
- protocmp.IgnoreUnknown(),
- protocmp.IgnoreFields(&databasev1.Shard{},
"updated_at"),
- protocmp.IgnoreFields(&commonv1.Metadata{}, "id",
"create_revision", "mod_revision"),
- protocmp.Transform())
- },
KindMask: func(a, b proto.Message) bool {
return false
},
diff --git a/banyand/metadata/schema/kind.go b/banyand/metadata/schema/kind.go
index 4f406f86..1a0573bb 100644
--- a/banyand/metadata/schema/kind.go
+++ b/banyand/metadata/schema/kind.go
@@ -42,10 +42,9 @@ const (
KindTopNAggregation
KindProperty
KindNode
- KindShard
KindMask = KindGroup | KindStream | KindMeasure |
KindIndexRuleBinding | KindIndexRule |
- KindTopNAggregation | KindProperty | KindNode | KindShard
+ KindTopNAggregation | KindProperty | KindNode
KindSize = 9
)
@@ -67,8 +66,6 @@ func (k Kind) key() string {
return propertyKeyPrefix
case KindNode:
return nodeKeyPrefix
- case KindShard:
- return shardKeyPrefix
default:
return "unknown"
}
@@ -97,8 +94,6 @@ func (k Kind) Unmarshal(kv *mvccpb.KeyValue) (Metadata,
error) {
m = &databasev1.TopNAggregation{}
case KindNode:
m = &databasev1.Node{}
- case KindShard:
- m = &databasev1.Shard{}
default:
return Metadata{}, errUnsupportedEntityType
}
@@ -143,8 +138,6 @@ func (k Kind) String() string {
return "property"
case KindNode:
return "node"
- case KindShard:
- return "shard"
default:
return "unknown"
}
diff --git a/banyand/metadata/schema/schema.go
b/banyand/metadata/schema/schema.go
index 0c7095af..dd4570ca 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -54,7 +54,6 @@ type Registry interface {
TopNAggregation
Property
Node
- Shard
RegisterHandler(string, Kind, EventHandler)
}
@@ -111,11 +110,6 @@ func (m Metadata) key() (string, error) {
}), nil
case KindNode:
return formatNodeKey(m.Name), nil
- case KindShard:
- return formatShardKey(&commonv1.Metadata{
- Group: m.Group,
- Name: m.Name,
- }), nil
default:
return "", errUnsupportedEntityType
}
@@ -203,9 +197,3 @@ type Node interface {
ListNode(ctx context.Context, role databasev1.Role)
([]*databasev1.Node, error)
RegisterNode(ctx context.Context, node *databasev1.Node) error
}
-
-// Shard allows CRUD shard schemas in a group.
-type Shard interface {
- CreateOrUpdateShard(ctx context.Context, shard *databasev1.Shard) error
- ListShard(ctx context.Context, opt ListOpt) ([]*databasev1.Shard, error)
-}
diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go
deleted file mode 100644
index c6499931..00000000
--- a/banyand/metadata/schema/shard.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package schema
-
-import (
- "context"
-
- "github.com/pkg/errors"
- "google.golang.org/protobuf/types/known/timestamppb"
-
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-)
-
-var shardKeyPrefix = "/shards/"
-
-func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx context.Context, shard
*databasev1.Shard) error {
- if shard.UpdatedAt != nil {
- shard.UpdatedAt = timestamppb.Now()
- }
- md := Metadata{
- TypeMeta: TypeMeta{
- Kind: KindShard,
- Group: shard.GetMetadata().GetGroup(),
- Name: shard.GetMetadata().GetName(),
- },
- Spec: shard,
- }
- _, err := e.update(ctx, md)
- if err == nil {
- return nil
- }
- if errors.Is(err, ErrGRPCResourceNotFound) {
- shard.CreatedAt = shard.UpdatedAt
- md.Spec = shard
- _, err = e.create(ctx, md)
- return err
- }
- return err
-}
-
-func (e *etcdSchemaRegistry) ListShard(ctx context.Context, opt ListOpt)
([]*databasev1.Shard, error) {
- if opt.Group == "" {
- return nil, BadRequest("group", "group should not be empty")
- }
- messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group,
shardKeyPrefix), KindShard)
- if err != nil {
- return nil, err
- }
- entities := make([]*databasev1.Shard, 0, len(messages))
- for _, message := range messages {
- entities = append(entities, message.(*databasev1.Shard))
- }
- return entities, nil
-}
-
-func formatShardKey(metadata *commonv1.Metadata) string {
- return formatKey(shardKeyPrefix, metadata)
-}
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index a47e6df8..70a89d8e 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -104,7 +104,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
return
}
- resp = bus.NewMessage(bus.MessageID(now), entities)
+ resp = bus.NewMessage(bus.MessageID(now),
&streamv1.QueryResponse{Elements: entities})
return
}
@@ -170,7 +170,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.RawJSON("ret",
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
}
- resp = bus.NewMessage(bus.MessageID(now), result)
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{DataPoints: result})
return
}
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 55939991..1f266c7e 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -88,7 +88,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
Msg("fail to list shards")
return
}
- aggregator := createTopNPostAggregator(request.GetTopN(),
+ aggregator := CreateTopNPostAggregator(request.GetTopN(),
request.GetAgg(), request.GetFieldValueSort())
entity, err := locateEntity(topNSchema, request.GetFieldValueSort(),
request.GetConditions())
if err != nil {
@@ -130,7 +130,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
Msg("fail to parse topN
family")
return
}
- _ =
aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64),
iter.Val().Time())
+ _ =
aggregator.Put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64),
iter.Val().Time())
}
_ = iter.Close()
}
@@ -138,7 +138,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp
bus.Message) {
}
now := time.Now().UnixNano()
- resp = bus.NewMessage(bus.MessageID(now),
aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames()))
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.TopNResponse{Lists:
aggregator.Val(sourceMeasure.GetSchema().GetEntity().GetTagNames())})
return
}
@@ -254,13 +254,14 @@ func (n *aggregatorItem) GetTags(tagNames []string)
[]*modelv1.Tag {
return tags
}
-// postProcessor defines necessary methods for Top-N post processor with or
without aggregation.
-type postProcessor interface {
- put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64)
error
- val([]string) []*measurev1.TopNList
+// PostProcessor defines necessary methods for Top-N post processor with or
without aggregation.
+type PostProcessor interface {
+ Put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64)
error
+ Val([]string) []*measurev1.TopNList
}
-func createTopNPostAggregator(topN int32, aggrFunc
modelv1.AggregationFunction, sort modelv1.Sort) postProcessor {
+// CreateTopNPostAggregator creates a Top-N post processor with or without
aggregation.
+func CreateTopNPostAggregator(topN int32, aggrFunc
modelv1.AggregationFunction, sort modelv1.Sort) PostProcessor {
if aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
// if aggregation is not specified, we have to keep all
timelines
return &postNonAggregationProcessor{
@@ -327,7 +328,7 @@ func (aggr *postAggregationProcessor) Pop() any {
return item
}
-func (aggr *postAggregationProcessor) put(entityValues tsdb.EntityValues, val
int64, timestampMillis uint64) error {
+func (aggr *postAggregationProcessor) Put(entityValues tsdb.EntityValues, val
int64, timestampMillis uint64) error {
// update latest ts
if aggr.latestTimestamp < timestampMillis {
aggr.latestTimestamp = timestampMillis
@@ -374,7 +375,7 @@ func (aggr *postAggregationProcessor) tryEnqueue(key
string, item *aggregatorIte
}
}
-func (aggr *postAggregationProcessor) val(tagNames []string)
[]*measurev1.TopNList {
+func (aggr *postAggregationProcessor) Val(tagNames []string)
[]*measurev1.TopNList {
topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
for aggr.Len() > 0 {
@@ -430,7 +431,7 @@ type postNonAggregationProcessor struct {
sort modelv1.Sort
}
-func (naggr *postNonAggregationProcessor) val(tagNames []string)
[]*measurev1.TopNList {
+func (naggr *postNonAggregationProcessor) Val(tagNames []string)
[]*measurev1.TopNList {
topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
for ts, timeline := range naggr.timelines {
items := make([]*measurev1.TopNList_Item, timeline.Len())
@@ -462,7 +463,7 @@ func (naggr *postNonAggregationProcessor) val(tagNames
[]string) []*measurev1.To
return topNLists
}
-func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues,
val int64, timestampMillis uint64) error {
+func (naggr *postNonAggregationProcessor) Put(entityValues tsdb.EntityValues,
val int64, timestampMillis uint64) error {
key := entityValues.String()
if timeline, ok := naggr.timelines[timestampMillis]; ok {
if timeline.Len() < int(naggr.topN) {
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index c5b8dc42..a6872881 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -19,6 +19,7 @@
package queue
import (
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -84,6 +85,9 @@ func (*local) GetPort() *uint32 {
return nil
}
+func (*local) Register(schema.EventHandler) {
+}
+
type localBatchPublisher struct {
local *bus.Bus
}
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index c5c68139..aa9dbbfe 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -88,6 +88,9 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
}
c := clusterv1.NewServiceClient(conn)
p.clients[name] = &client{conn: conn, client: c}
+ if p.handler != nil {
+ p.handler.OnAddOrUpdate(md)
+ }
}
func (p *pub) OnDelete(md schema.Metadata) {
@@ -112,5 +115,8 @@ func (p *pub) OnDelete(md schema.Metadata) {
client.conn.Close() // Close the client connection
}
delete(p.clients, name)
+ if p.handler != nil {
+ p.handler.OnDelete(md)
+ }
}
}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index b72e9a07..51d56967 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -30,6 +30,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
+ "github.com/apache/skywalking-banyandb/api/data"
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -46,13 +47,17 @@ var (
type pub struct {
metadata metadata.Repo
+ handler schema.EventHandler
log *logger.Logger
clients map[string]*client
closer *run.Closer
mu sync.RWMutex
}
-// GracefulStop implements run.Service.
+func (p *pub) Register(handler schema.EventHandler) {
+ p.handler = handler
+}
+
func (p *pub) GracefulStop() {
p.closer.Done()
p.closer.CloseThenWait()
@@ -101,9 +106,7 @@ func (p *pub) Publish(topic bus.Topic, messages
...bus.Message) (bus.Future, err
if !ok {
return multierr.Append(err, fmt.Errorf("failed to get
client for node %s", node))
}
- ctx, cancel := context.WithTimeout(context.Background(),
10*time.Second)
- defer cancel()
- stream, errCreateStream := client.client.Send(ctx)
+ stream, errCreateStream :=
client.client.Send(context.Background())
if err != nil {
return multierr.Append(err, fmt.Errorf("failed to get
stream for node %s: %w", node, errCreateStream))
}
@@ -112,6 +115,7 @@ func (p *pub) Publish(topic bus.Topic, messages
...bus.Message) (bus.Future, err
return multierr.Append(err, fmt.Errorf("failed to send
message to node %s: %w", node, errSend))
}
f.clients = append(f.clients, stream)
+ f.topics = append(f.topics, topic)
return err
}
for _, m := range messages {
@@ -129,16 +133,15 @@ func (p *pub) NewBatchPublisher() queue.BatchPublisher {
func New(metadata metadata.Repo) queue.Client {
return &pub{
metadata: metadata,
+ clients: make(map[string]*client),
closer: run.NewCloser(1),
}
}
-// Name implements run.PreRunner.
func (*pub) Name() string {
return "queue-client"
}
-// PreRun implements run.PreRunner.
func (p *pub) PreRun(context.Context) error {
p.log = logger.GetLogger("server-queue")
p.metadata.RegisterHandler("queue-client", schema.KindNode, p)
@@ -172,7 +175,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages
...bus.Message) (bus
}
node := m.Node()
sendData := func() (success bool) {
- if stream, ok := bp.streams[node]; !ok {
+ if stream, ok := bp.streams[node]; ok {
defer func() {
if !success {
delete(bp.streams, node)
@@ -223,9 +226,6 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages
...bus.Message) (bus
}
func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest,
error) {
- if !m.IsRemote() {
- return nil, fmt.Errorf("message %d is not remote", m.ID())
- }
r := &clusterv1.SendRequest{
Topic: topic.String(),
MessageId: uint64(m.ID()),
@@ -244,6 +244,7 @@ func messageToRequest(topic bus.Topic, m bus.Message)
(*clusterv1.SendRequest, e
type future struct {
clients []clusterv1.Service_SendClient
+ topics []bus.Topic
}
func (l *future) Get() (bus.Message, error) {
@@ -251,17 +252,33 @@ func (l *future) Get() (bus.Message, error) {
return bus.Message{}, io.EOF
}
c := l.clients[0]
+ t := l.topics[0]
defer func() {
l.clients = l.clients[1:]
+ l.topics = l.topics[1:]
}()
resp, err := c.Recv()
if err != nil {
return bus.Message{}, err
}
- return bus.NewMessage(
- bus.MessageID(resp.MessageId),
- resp.Body,
- ), nil
+ if resp.Error != "" {
+ return bus.Message{}, errors.New(resp.Error)
+ }
+ if resp.Body == nil {
+ return bus.NewMessage(bus.MessageID(resp.MessageId), nil), nil
+ }
+ if messageSupplier, ok := data.TopicResponseMap[t]; ok {
+ m := messageSupplier()
+ err = resp.Body.UnmarshalTo(m)
+ if err != nil {
+ return bus.Message{}, err
+ }
+ return bus.NewMessage(
+ bus.MessageID(resp.MessageId),
+ m,
+ ), nil
+ }
+ return bus.Message{}, fmt.Errorf("invalid topic %s", t)
}
func (l *future) GetAll() ([]bus.Message, error) {
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 7bb8c531..d518d331 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -20,6 +20,7 @@ package queue
import (
"io"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -39,6 +40,7 @@ type Client interface {
bus.Publisher
bus.Broadcaster
NewBatchPublisher() BatchPublisher
+ Register(schema.EventHandler)
}
// Server is the interface for receiving data from the queue.
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index c689eef0..021ec877 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -22,9 +22,12 @@ import (
"io"
"github.com/pkg/errors"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
+ "github.com/apache/skywalking-banyandb/api/data"
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
"github.com/apache/skywalking-banyandb/pkg/bus"
)
@@ -52,11 +55,18 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
return nil
}
if err != nil {
- reply(writeEntity, err, "failed to receive message")
- continue
+ if status.Code(err) == codes.Canceled {
+ return nil
+ }
+ s.log.Error().Err(err).Msg("failed to receive message")
+ return err
}
if writeEntity.Topic != "" && topic == nil {
- t := bus.UniTopic(writeEntity.Topic)
+ t, ok := data.TopicMap[writeEntity.Topic]
+ if !ok {
+ reply(writeEntity, err, "invalid topic")
+ continue
+ }
topic = &t
}
if topic == nil {
@@ -68,9 +78,25 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
reply(writeEntity, err, "no listener found")
continue
}
- m :=
listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId),
writeEntity.Body))
+ var m bus.Message
+ if reqSupplier, ok := data.TopicRequestMap[*topic]; ok {
+ req := reqSupplier()
+ if errUnmarshal := writeEntity.Body.UnmarshalTo(req);
errUnmarshal != nil {
+ reply(writeEntity, errUnmarshal, "failed to
unmarshal message")
+ continue
+ }
+ m =
listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), req))
+ } else {
+ reply(writeEntity, err, "unknown topic")
+ continue
+ }
+
if m.Data() == nil {
- reply(writeEntity, err, "no response")
+ if errSend := stream.Send(&clusterv1.SendResponse{
+ MessageId: writeEntity.MessageId,
+ }); errSend != nil {
+ s.log.Error().Stringer("written",
writeEntity).Err(errSend).Msg("failed to send response")
+ }
continue
}
message, ok := m.Data().(proto.Message)
@@ -87,8 +113,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
MessageId: writeEntity.MessageId,
Body: anyMessage,
}); err != nil {
- reply(writeEntity, err, "failed to send response")
- continue
+ s.log.Error().Stringer("written",
writeEntity).Err(err).Msg("failed to send response")
}
}
}
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 0ce3e6ce..18d5c3a1 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -271,6 +271,18 @@ type portableSupplier struct {
l *logger.Logger
}
+func (*portableSupplier) OpenDB(_ *commonv1.Group) (tsdb.Database, error) {
+ panic("do not support open db")
+}
+
+func (s *portableSupplier) OpenResource(shardNum uint32, _ tsdb.Supplier,
resource resourceSchema.Resource) (io.Closer, error) {
+ streamSchema := resource.Schema().(*databasev1.Stream)
+ return openStream(shardNum, nil, streamSpec{
+ schema: streamSchema,
+ indexRules: resource.IndexRules(),
+ }, s.l), nil
+}
+
func newPortableSupplier(metadata metadata.Repo, l *logger.Logger)
*portableSupplier {
return &portableSupplier{
metadata: metadata,
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index a989ebb2..4d5c8f8a 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -73,6 +73,9 @@ func openStream(shardNum uint32, db tsdb.Supplier, spec
streamSpec, l *logger.Lo
sm.parseSpec()
ctx := context.WithValue(context.Background(), logger.ContextKey, l)
+ if db == nil {
+ return sm
+ }
sm.db = db
sm.indexWriter = index.NewWriter(ctx, index.WriterOptions{
DB: db,
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 6af8bf2f..6dca480c 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -50,7 +50,7 @@ type tsTable struct {
closeBufferTimer *time.Timer
position common.Position
bufferSize int64
- lock sync.Mutex
+ lock sync.RWMutex
}
func (t *tsTable) SizeOnDisk() int64 {
@@ -112,10 +112,12 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte,
error) {
}
func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
+ t.lock.RLock()
if t.buffer != nil {
+ defer t.lock.RUnlock()
return t.buffer.Write(key, val, ts)
}
-
+ t.lock.RUnlock()
if err := t.openBuffer(); err != nil {
return err
}
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 1fc94283..2d8764fd 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -385,7 +385,6 @@ func (bsb *bufferShardBucket) recoveryWal() error {
func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) {
var wg sync.WaitGroup
- wg.Add(len(segment.GetLogEntries()))
for _, logEntry := range segment.GetLogEntries() {
timestamps := logEntry.GetTimestamps()
values := logEntry.GetValues()
@@ -404,6 +403,7 @@ func (bsb *bufferShardBucket) recoveryWorkSegment(segment
wal.Segment) {
}
},
}
+ wg.Add(1)
elementIndex++
}
}
diff --git a/bydbctl/internal/cmd/group_test.go
b/bydbctl/internal/cmd/group_test.go
index 6c18a8fd..c600a165 100644
--- a/bydbctl/internal/cmd/group_test.go
+++ b/bydbctl/internal/cmd/group_test.go
@@ -27,7 +27,6 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
- "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
)
@@ -37,8 +36,7 @@ var _ = Describe("Group", func() {
var deferFunc func()
var rootCmd *cobra.Command
BeforeEach(func() {
- _, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ _, addr, deferFunc = setup.EmptyStandalone()
addr = "http://" + addr
// extracting the operation of creating group
rootCmd = &cobra.Command{Use: "root"}
@@ -141,7 +139,7 @@ resource_opts:
})
resp := new(databasev1.GroupRegistryServiceListResponse)
helpers.UnmarshalYAML([]byte(out), resp)
- Expect(resp.Group).To(HaveLen(4))
+ Expect(resp.Group).To(HaveLen(2))
})
AfterEach(func() {
diff --git a/bydbctl/internal/cmd/index_rule_binding_test.go
b/bydbctl/internal/cmd/index_rule_binding_test.go
index 1b8e4509..8d681475 100644
--- a/bydbctl/internal/cmd/index_rule_binding_test.go
+++ b/bydbctl/internal/cmd/index_rule_binding_test.go
@@ -37,8 +37,7 @@ var _ = Describe("IndexRuleBindingSchema Operation", func() {
var deferFunc func()
var rootCmd *cobra.Command
BeforeEach(func() {
- _, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ _, addr, deferFunc = setup.EmptyStandalone()
addr = "http://" + addr
// extracting the operation of creating indexRuleBinding schema
rootCmd = &cobra.Command{Use: "root"}
diff --git a/bydbctl/internal/cmd/index_rule_test.go
b/bydbctl/internal/cmd/index_rule_test.go
index 166f46fd..2aae1aef 100644
--- a/bydbctl/internal/cmd/index_rule_test.go
+++ b/bydbctl/internal/cmd/index_rule_test.go
@@ -37,8 +37,7 @@ var _ = Describe("IndexRuleSchema Operation", func() {
var deferFunc func()
var rootCmd *cobra.Command
BeforeEach(func() {
- _, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ _, addr, deferFunc = setup.EmptyStandalone()
addr = "http://" + addr
// extracting the operation of creating indexRule schema
rootCmd = &cobra.Command{Use: "root"}
diff --git a/bydbctl/internal/cmd/measure_test.go
b/bydbctl/internal/cmd/measure_test.go
index d7fa44c9..8df93f2b 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -44,8 +44,7 @@ var _ = Describe("Measure Schema Operation", func() {
var deferFunc func()
var rootCmd *cobra.Command
BeforeEach(func() {
- _, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ _, addr, deferFunc = setup.EmptyStandalone()
addr = "http://" + addr
// extracting the operation of creating measure schema
rootCmd = &cobra.Command{Use: "root"}
@@ -201,8 +200,7 @@ var _ = Describe("Measure Data Query", func() {
startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
interval = 1 * time.Millisecond
endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
- grpcAddr, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ grpcAddr, addr, deferFunc = setup.Standalone()
addr = "http://" + addr
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
diff --git a/bydbctl/internal/cmd/property_test.go
b/bydbctl/internal/cmd/property_test.go
index e16a2750..ee2eb81d 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -100,8 +100,7 @@ ttl: 30m
p2Proto := new(propertyv1.Property)
helpers.UnmarshalYAML([]byte(p2YAML), p2Proto)
BeforeEach(func() {
- _, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ _, addr, deferFunc = setup.EmptyStandalone()
addr = "http://" + addr
// extracting the operation of creating property schema
rootCmd = &cobra.Command{Use: "root"}
diff --git a/bydbctl/internal/cmd/stream_test.go
b/bydbctl/internal/cmd/stream_test.go
index 36f8330f..3618b9fb 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -44,8 +44,7 @@ var _ = Describe("Stream Schema Operation", func() {
var deferFunc func()
var rootCmd *cobra.Command
BeforeEach(func() {
- _, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ _, addr, deferFunc = setup.EmptyStandalone()
addr = "http://" + addr
// extracting the operation of creating stream schema
rootCmd = &cobra.Command{Use: "root"}
@@ -202,8 +201,7 @@ var _ = Describe("Stream Data Query", func() {
nowStr = now.Format(time.RFC3339)
interval = 500 * time.Millisecond
endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
- grpcAddr, addr, deferFunc = setup.Common()
- Eventually(helpers.HTTPHealthCheck(addr),
flags.EventuallyTimeout).Should(Succeed())
+ grpcAddr, addr, deferFunc = setup.Standalone()
addr = "http://" + addr
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 6ad65ada..5df38130 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -64,11 +64,6 @@ func (m Message) Node() string {
return m.node
}
-// IsRemote returns true if the Message is sent from a remote node.
-func (m Message) IsRemote() bool {
- return m.node != "local"
-}
-
// NewMessage returns a new Message with a MessageID and embed data.
func NewMessage(id MessageID, data interface{}) Message {
return Message{id: id, node: "local", payload: data}
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 7a421b12..a6ec86a6 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -30,7 +30,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue/sub"
"github.com/apache/skywalking-banyandb/banyand/stream"
- "github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/version"
@@ -75,17 +74,10 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
}
dataGroup := run.NewGroup("data")
dataGroup.Register(units...)
- logging := logger.Logging{}
dataCmd := &cobra.Command{
Use: "data",
Version: version.Build(),
Short: "Run as the data server",
- PersistentPreRunE: func(cmd *cobra.Command, args []string) (err
error) {
- if err = config.Load("logging", cmd.Flags()); err !=
nil {
- return err
- }
- return logger.Init(logging)
- },
RunE: func(cmd *cobra.Command, args []string) (err error) {
node, err := common.GenerateNode(pipeline.GetPort(),
nil)
if err != nil {
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 3ef2eb44..126f4b89 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -43,11 +44,12 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
pipeline := pub.New(metaSvc)
- grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
+ localPipeline := queue.Local()
+ grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
grpc.NewClusterNodeRegistry(pipeline))
profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService()
httpServer := http.NewServer()
- dQuery, err := dquery.NewService(metaSvc, pipeline)
+ dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate distributed query
service")
}
@@ -55,6 +57,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
units = append(units, runners...)
units = append(units,
metaSvc,
+ localPipeline,
pipeline,
dQuery,
grpcServer,
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index f0dca2f2..c2638b8e 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -57,7 +57,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
- grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
+ grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry())
profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService()
httpServer := http.NewServer()
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index 45ee4c66..bbe4d0d4 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -38,9 +38,7 @@ import (
var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
type unresolvedDistributed struct {
- originalQuery *measurev1.QueryRequest
- order *logical.OrderBy
- maxDataPointsSize int
+ originalQuery *measurev1.QueryRequest
}
func newUnresolvedDistributed(query *measurev1.QueryRequest)
logical.UnresolvedPlan {
@@ -49,39 +47,61 @@ func newUnresolvedDistributed(query
*measurev1.QueryRequest) logical.UnresolvedP
}
}
-func (ud *unresolvedDistributed) Limit(max int) {
- ud.maxDataPointsSize = max
-}
-
-func (ud *unresolvedDistributed) Sort(order *logical.OrderBy) {
- ud.order = order
-}
-
func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan,
error) {
if ud.originalQuery.TagProjection == nil {
return nil, fmt.Errorf("tag projection is required")
}
- if ud.originalQuery.FieldProjection == nil {
- return nil, fmt.Errorf("filed projection is required")
+ projectionTags := logical.ToTags(ud.originalQuery.GetTagProjection())
+ if len(projectionTags) > 0 {
+ var err error
+ projTagsRefs, err := s.CreateTagRef(projectionTags...)
+ if err != nil {
+ return nil, err
+ }
+ s = s.ProjTags(projTagsRefs...)
+ }
+ projectionFields := make([]*logical.Field,
len(ud.originalQuery.GetFieldProjection().GetNames()))
+ for i, fieldNameProj := range
ud.originalQuery.GetFieldProjection().GetNames() {
+ projectionFields[i] = logical.NewField(fieldNameProj)
+ }
+ if len(projectionFields) > 0 {
+ var err error
+ projFieldRefs, err := s.CreateFieldRef(projectionFields...)
+ if err != nil {
+ return nil, err
+ }
+ s = s.ProjFields(projFieldRefs...)
+ }
+ limit := ud.originalQuery.GetLimit()
+ if limit == 0 {
+ limit = defaultLimit
}
temp := &measurev1.QueryRequest{
TagProjection: ud.originalQuery.TagProjection,
FieldProjection: ud.originalQuery.FieldProjection,
Metadata: ud.originalQuery.Metadata,
Criteria: ud.originalQuery.Criteria,
- Limit: uint32(ud.maxDataPointsSize),
- OrderBy: &modelv1.QueryOrder{
- IndexRuleName: ud.order.Index.Metadata.Name,
- Sort: ud.order.Sort,
- },
+ Limit: limit,
+ OrderBy: ud.originalQuery.OrderBy,
}
- if ud.order == nil {
+ if ud.originalQuery.OrderBy == nil {
return &distributedPlan{
queryTemplate: temp,
s: s,
sortByTime: true,
}, nil
}
+ if ud.originalQuery.OrderBy.IndexRuleName == "" {
+ result := &distributedPlan{
+ queryTemplate: temp,
+ s: s,
+ sortByTime: true,
+ }
+ if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+ result.desc = true
+ }
+ return result, nil
+ }
ok, indexRule :=
s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName)
if !ok {
return nil, fmt.Errorf("index rule %s not found",
ud.originalQuery.OrderBy.IndexRuleName)
@@ -106,17 +126,21 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
}
type distributedPlan struct {
- s logical.Schema
- queryTemplate *measurev1.QueryRequest
- sortTagSpec logical.TagSpec
- sortByTime bool
- desc bool
+ s logical.Schema
+ queryTemplate *measurev1.QueryRequest
+ sortTagSpec logical.TagSpec
+ sortByTime bool
+ desc bool
+ maxDataPointsSize uint32
}
func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator,
error) {
dctx := executor.FromDistributedExecutionContext(ctx)
query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest)
query.TimeRange = dctx.TimeRange()
+ if t.maxDataPointsSize > 0 {
+ query.Limit = t.maxDataPointsSize
+ }
ff, err := dctx.Broadcast(data.TopicMeasureQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
if err != nil {
return nil, err
@@ -127,8 +151,12 @@ func (t *distributedPlan) Execute(ctx context.Context)
(executor.MIterator, erro
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
} else {
+ d := m.Data()
+ if d == nil {
+ continue
+ }
see = append(see,
-
newSortableElements(m.Data().(*measurev1.QueryResponse).DataPoints,
+
newSortableElements(d.(*measurev1.QueryResponse).DataPoints,
t.sortByTime, t.sortTagSpec))
}
}
@@ -149,6 +177,10 @@ func (t *distributedPlan) Schema() logical.Schema {
return t.s
}
+func (t *distributedPlan) Limit(max int) {
+ t.maxDataPointsSize = uint32(max)
+}
+
var _ sort.Comparable = (*comparableDataPoint)(nil)
type comparableDataPoint struct {
@@ -220,7 +252,7 @@ func (s *sortableElements) iter(fn
func(*measurev1.DataPoint) (*comparableDataPo
return s.iter(fn)
}
s.cur = cur
- return s.index < len(s.dataPoints)
+ return s.index <= len(s.dataPoints)
}
var _ executor.MIterator = (*sortedMIterator)(nil)
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go
b/pkg/query/logical/stream/stream_plan_distributed.go
index 7fce94cc..4e111ad8 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -51,6 +51,15 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
if ud.originalQuery.Projection == nil {
return nil, fmt.Errorf("projection is required")
}
+ projectionTags := logical.ToTags(ud.originalQuery.GetProjection())
+ if len(projectionTags) > 0 {
+ var err error
+ projTagsRefs, err := s.CreateTagRef(projectionTags...)
+ if err != nil {
+ return nil, err
+ }
+ s = s.ProjTags(projTagsRefs...)
+ }
limit := ud.originalQuery.GetLimit()
if limit == 0 {
limit = defaultLimit
@@ -69,6 +78,17 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
sortByTime: true,
}, nil
}
+ if ud.originalQuery.OrderBy.IndexRuleName == "" {
+ result := &distributedPlan{
+ queryTemplate: temp,
+ s: s,
+ sortByTime: true,
+ }
+ if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+ result.desc = true
+ }
+ return result, nil
+ }
ok, indexRule :=
s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName)
if !ok {
return nil, fmt.Errorf("index rule %s not found",
ud.originalQuery.OrderBy.IndexRuleName)
@@ -93,18 +113,22 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
}
type distributedPlan struct {
- s logical.Schema
- queryTemplate *streamv1.QueryRequest
- sortTagSpec logical.TagSpec
- sortByTime bool
- desc bool
+ s logical.Schema
+ queryTemplate *streamv1.QueryRequest
+ sortTagSpec logical.TagSpec
+ sortByTime bool
+ desc bool
+ maxElementSize uint32
}
func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element,
error) {
dctx := executor.FromDistributedExecutionContext(ctx)
query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
query.TimeRange = dctx.TimeRange()
- ff, err := dctx.Broadcast(data.TopicMeasureQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+ if t.maxElementSize > 0 {
+ query.Limit = t.maxElementSize
+ }
+ ff, err := dctx.Broadcast(data.TopicStreamQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
if err != nil {
return nil, err
}
@@ -114,9 +138,17 @@ func (t *distributedPlan) Execute(ctx context.Context)
([]*streamv1.Element, err
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
} else {
+ d := m.Data()
+ if d == nil {
+ continue
+ }
+ resp := d.(*streamv1.QueryResponse)
+ if err != nil {
+ allErr = multierr.Append(allErr, err)
+ continue
+ }
see = append(see,
-
newSortableElements(m.Data().(*streamv1.QueryResponse).Elements,
- t.sortByTime, t.sortTagSpec))
+ newSortableElements(resp.Elements,
t.sortByTime, t.sortTagSpec))
}
}
iter := sort.NewItemIter[*comparableElement](see, t.desc)
@@ -139,6 +171,10 @@ func (t *distributedPlan) Schema() logical.Schema {
return t.s
}
+func (t *distributedPlan) Limit(max int) {
+ t.maxElementSize = uint32(max)
+}
+
var _ sort.Comparable = (*comparableElement)(nil)
type comparableElement struct {
@@ -210,5 +246,5 @@ func (s *sortableElements) iter(fn func(*streamv1.Element)
(*comparableElement,
return s.iter(fn)
}
s.cur = cur
- return s.index < len(s.elements)
+ return s.index <= len(s.elements)
}
diff --git a/pkg/run/run.go b/pkg/run/run.go
index 8e9a14c2..cd4d4f6d 100644
--- a/pkg/run/run.go
+++ b/pkg/run/run.go
@@ -268,6 +268,9 @@ func (g *Group) RegisterFlags() *FlagSet {
func (g *Group) RunConfig() (interrupted bool, err error) {
g.log = logger.GetLogger(g.name)
g.configured = true
+ if g.f == nil {
+ return false, nil
+ }
if g.name == "" {
// use the binary name if custom name has not been provided
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index ce96cf39..049f5d29 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -253,7 +253,7 @@ func (sr *schemaRepo) createGroup(name string) (g *group) {
if sr.resourceSupplier != nil {
g = newGroup(sr.metadata, sr.l, sr.resourceSupplier)
} else {
- g = newPortableGroup(sr.metadata, sr.l)
+ g = newPortableGroup(sr.metadata, sr.l,
sr.resourceSchemaSupplier)
}
sr.data[name] = g
return
@@ -354,13 +354,14 @@ func (sr *schemaRepo) Close() {
var _ Group = (*group)(nil)
type group struct {
- resourceSupplier ResourceSupplier
- metadata metadata.Repo
- db atomic.Value
- groupSchema atomic.Pointer[commonv1.Group]
- l *logger.Logger
- schemaMap map[string]*resourceSpec
- mapMutex sync.RWMutex
+ resourceSupplier ResourceSupplier
+ resourceSchemaSupplier ResourceSchemaSupplier
+ metadata metadata.Repo
+ db atomic.Value
+ groupSchema atomic.Pointer[commonv1.Group]
+ l *logger.Logger
+ schemaMap map[string]*resourceSpec
+ mapMutex sync.RWMutex
}
func newGroup(
@@ -369,11 +370,12 @@ func newGroup(
resourceSupplier ResourceSupplier,
) *group {
g := &group{
- groupSchema: atomic.Pointer[commonv1.Group]{},
- metadata: metadata,
- l: l,
- schemaMap: make(map[string]*resourceSpec),
- resourceSupplier: resourceSupplier,
+ groupSchema: atomic.Pointer[commonv1.Group]{},
+ metadata: metadata,
+ l: l,
+ schemaMap: make(map[string]*resourceSpec),
+ resourceSupplier: resourceSupplier,
+ resourceSchemaSupplier: resourceSupplier,
}
return g
}
@@ -381,12 +383,14 @@ func newGroup(
func newPortableGroup(
metadata metadata.Repo,
l *logger.Logger,
+ resourceSchemaSupplier ResourceSchemaSupplier,
) *group {
g := &group{
- groupSchema: atomic.Pointer[commonv1.Group]{},
- metadata: metadata,
- l: l,
- schemaMap: make(map[string]*resourceSpec),
+ groupSchema: atomic.Pointer[commonv1.Group]{},
+ metadata: metadata,
+ l: l,
+ schemaMap: make(map[string]*resourceSpec),
+ resourceSchemaSupplier: resourceSchemaSupplier,
}
return g
}
@@ -454,13 +458,15 @@ func (g *group) storeResource(ctx context.Context,
resourceSchema ResourceSchema
if preResource != nil && preResource.isNewThan(resource) {
return preResource, nil
}
+ var dbSupplier tsdb.Supplier
if !g.isPortable() {
- sm, errTS :=
g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g,
resource)
- if errTS != nil {
- return nil, errTS
- }
- resource.delegated = sm
+ dbSupplier = g
+ }
+ sm, errTS :=
g.resourceSchemaSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum,
dbSupplier, resource)
+ if errTS != nil {
+ return nil, errTS
}
+ resource.delegated = sm
g.schemaMap[key] = resource
if preResource != nil {
_ = preResource.Close()
@@ -501,7 +507,7 @@ func (g *group) close() (err error) {
err = multierr.Append(err, s.Close())
}
g.mapMutex.RUnlock()
- if !g.isInit() {
+ if !g.isInit() || g.isPortable() {
return nil
}
return multierr.Append(err, g.SupplyTSDB().Close())
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index f19a4230..65880102 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -75,12 +75,12 @@ type Resource interface {
// ResourceSchemaSupplier allows get a ResourceSchema from the metadata.
type ResourceSchemaSupplier interface {
ResourceSchema(metadata *commonv1.Metadata) (ResourceSchema, error)
+ OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource)
(io.Closer, error)
}
// ResourceSupplier allows open a resource and its embedded tsdb.
type ResourceSupplier interface {
ResourceSchemaSupplier
- OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource)
(io.Closer, error)
OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error)
}
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 622f897b..b3159f90 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -22,43 +22,61 @@ import (
"context"
"fmt"
"sync"
+ "time"
"github.com/onsi/gomega"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
- "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
- "github.com/apache/skywalking-banyandb/banyand/liaison/http"
- "github.com/apache/skywalking-banyandb/banyand/measure"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
- "github.com/apache/skywalking-banyandb/banyand/query"
- "github.com/apache/skywalking-banyandb/banyand/queue"
- "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/cmdsetup"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
+ testflags "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
)
-const host = "127.0.0.1"
+const host = "localhost"
-// Common wires common modules to build a testing ready runtime.
-func Common(flags ...string) (string, string, func()) {
- return CommonWithSchemaLoaders([]SchemaLoader{
+// Standalone wires standalone modules to build a testing ready runtime.
+func Standalone(flags ...string) (string, string, func()) {
+ return StandaloneWithSchemaLoaders([]SchemaLoader{
&preloadService{name: "stream"},
&preloadService{name: "measure"},
- }, flags...)
+ }, "", "", flags...)
}
-// CommonWithSchemaLoaders wires common modules to build a testing ready
runtime. It also allows to preload schema.
-func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader, flags ...string)
(string, string, func()) {
- path, _, err := test.NewSpace()
+// StandaloneWithTLS wires standalone modules to build a testing ready runtime
with TLS enabled.
+func StandaloneWithTLS(certFile, keyFile string, flags ...string) (string,
string, func()) {
+ return StandaloneWithSchemaLoaders([]SchemaLoader{
+ &preloadService{name: "stream"},
+ &preloadService{name: "measure"},
+ }, certFile, keyFile, flags...)
+}
+
+// EmptyStandalone wires standalone modules to build a testing ready runtime.
+func EmptyStandalone(flags ...string) (string, string, func()) {
+ return StandaloneWithSchemaLoaders(nil, "", "", flags...)
+}
+
+// StandaloneWithSchemaLoaders wires standalone modules to build a testing
ready runtime. It also allows to preload schema.
+func StandaloneWithSchemaLoaders(schemaLoaders []SchemaLoader, certFile,
keyFile string, flags ...string) (string, string, func()) {
+ path, deferFn, err := test.NewSpace()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
var ports []int
ports, err = test.AllocateFreePorts(4)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
addr := fmt.Sprintf("%s:%d", host, ports[0])
httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
+ endpoint := fmt.Sprintf("http://%s:%d", host, ports[2])
ff := []string{
+ "--logging-env=dev",
+ "--logging-level=error",
"--grpc-host=" + host,
fmt.Sprintf("--grpc-port=%d", ports[0]),
"--http-host=" + host,
@@ -67,66 +85,63 @@ func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader,
flags ...string) (str
"--stream-root-path=" + path,
"--measure-root-path=" + path,
"--metadata-root-path=" + path,
- fmt.Sprintf("--etcd-listen-client-url=http://%s:%d", host,
ports[2]), fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d", host, ports[3]),
+ fmt.Sprintf("--etcd-listen-client-url=%s", endpoint),
fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d", host, ports[3]),
+ }
+ tlsEnabled := false
+ if certFile != "" && keyFile != "" {
+ ff = append(ff, "--tls=true", "--cert-file="+certFile,
"--key-file="+keyFile, "--http-grpc-cert-file="+certFile)
+ tlsEnabled = true
}
if len(flags) > 0 {
ff = append(ff, flags...)
}
- gracefulStop := modules(schemaLoaders, ff)
- return addr, httpAddr, func() {
- gracefulStop()
- // deferFn()
+ cmdFlags := []string{"standalone"}
+ cmdFlags = append(cmdFlags, ff...)
+ closeFn := CMD(cmdFlags...)
+ if tlsEnabled {
+ creds, err := credentials.NewClientTLSFromFile(certFile,
"localhost")
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second,
10*time.Second, grpclib.WithTransportCredentials(creds)),
testflags.EventuallyTimeout).
+ Should(gomega.Succeed())
+ } else {
+ gomega.Eventually(
+ helpers.HealthCheck(addr, 10*time.Second,
10*time.Second, grpclib.WithTransportCredentials(insecure.NewCredentials())),
+ testflags.EventuallyTimeout).Should(gomega.Succeed())
}
-}
-
-func modules(schemaLoaders []SchemaLoader, flags []string) func() {
- // Init `Queue` module
- pipeline := queue.Local()
- // Init `Metadata` module
- metaSvc, err := metadata.NewService(context.TODO())
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- // Init `Stream` module
- streamSvc, err := stream.NewService(context.TODO(), metaSvc, pipeline)
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- // Init `Measure` module
- measureSvc, err := measure.NewService(context.TODO(), metaSvc, pipeline)
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- // Init `Query` module
- q, err := query.NewService(context.TODO(), streamSvc, measureSvc,
metaSvc, pipeline)
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
- httpServer := http.NewServer()
-
- units := []run.Unit{
- pipeline,
- metaSvc,
+ gomega.Eventually(helpers.HTTPHealthCheck(httpAddr),
testflags.EventuallyTimeout).Should(gomega.Succeed())
+
+ if schemaLoaders != nil {
+ schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{endpoint}),
+ )
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ defer schemaRegistry.Close()
+ var units []run.Unit
+ for _, sl := range schemaLoaders {
+ sl.SetRegistry(schemaRegistry)
+ units = append(units, sl)
+ }
+ preloadGroup := run.NewGroup("preload")
+ preloadGroup.Register(units...)
+ err = preloadGroup.Run(context.Background())
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
- for _, sl := range schemaLoaders {
- sl.SetMeta(metaSvc)
- units = append(units, sl)
+ return addr, httpAddr, func() {
+ closeFn()
+ deferFn()
}
- units = append(units,
- streamSvc,
- measureSvc,
- q,
- tcp,
- httpServer)
-
- return test.SetupModules(
- flags,
- units...,
- )
}
// SchemaLoader is a service that can preload schema.
type SchemaLoader interface {
run.Unit
- SetMeta(meta metadata.Service)
+ SetRegistry(registry schema.Registry)
}
type preloadService struct {
- metaSvc metadata.Service
- name string
+ registry schema.Registry
+ name string
}
func (p *preloadService) Name() string {
@@ -135,13 +150,13 @@ func (p *preloadService) Name() string {
func (p *preloadService) PreRun(ctx context.Context) error {
if p.name == "stream" {
- return test_stream.PreloadSchema(ctx,
p.metaSvc.SchemaRegistry())
+ return test_stream.PreloadSchema(ctx, p.registry)
}
- return test_measure.PreloadSchema(ctx, p.metaSvc.SchemaRegistry())
+ return test_measure.PreloadSchema(ctx, p.registry)
}
-func (p *preloadService) SetMeta(meta metadata.Service) {
- p.metaSvc = meta
+func (p *preloadService) SetRegistry(registry schema.Registry) {
+ p.registry = registry
}
// CMD runs the command with given flags.
@@ -162,3 +177,54 @@ func CMD(flags ...string) func() {
wg.Wait()
}
}
+
+// DataNode runs a data node.
+func DataNode(etcdEndpoint string) func() {
+ path, deferFn, err := test.NewSpace()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ ports, err := test.AllocateFreePorts(1)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ addr := fmt.Sprintf("%s:%d", host, ports[0])
+ nodeHost := "127.0.0.1"
+ closeFn := CMD("data",
+ "--grpc-host="+host,
+ fmt.Sprintf("--grpc-port=%d", ports[0]),
+ "--stream-root-path="+path,
+ "--measure-root-path="+path,
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost)
+ gomega.Eventually(
+ helpers.HealthCheck(addr, 10*time.Second, 10*time.Second,
grpclib.WithTransportCredentials(insecure.NewCredentials())),
+ testflags.EventuallyTimeout).Should(gomega.Succeed())
+ gomega.Eventually(func() (map[string]*databasev1.Node, error) {
+ return helpers.ListKeys(etcdEndpoint,
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
+ }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
+ return func() {
+ closeFn()
+ deferFn()
+ }
+}
+
+// LiaisonNode runs a liaison node.
+func LiaisonNode(etcdEndpoint string) (string, func()) {
+ ports, err := test.AllocateFreePorts(2)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ addr := fmt.Sprintf("%s:%d", host, ports[0])
+ httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
+ nodeHost := "127.0.0.1"
+ closeFn := CMD("liaison",
+ "--grpc-host="+host,
+ fmt.Sprintf("--grpc-port=%d", ports[0]),
+ "--http-host="+host,
+ fmt.Sprintf("--http-port=%d", ports[1]),
+ "--http-grpc-addr="+addr,
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost)
+ gomega.Eventually(helpers.HTTPHealthCheck(httpAddr),
testflags.EventuallyTimeout).Should(gomega.Succeed())
+ gomega.Eventually(func() (map[string]*databasev1.Node, error) {
+ return helpers.ListKeys(etcdEndpoint,
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
+ }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
+ return addr, closeFn
+}
diff --git a/test/integration/standalone/query/query_suite_test.go
b/test/cases/init.go
similarity index 54%
copy from test/integration/standalone/query/query_suite_test.go
copy to test/cases/init.go
index b9852cbb..72f66315 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/cases/init.go
@@ -15,58 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-package integration_query_test
+// Package cases provides some tools to access test data.
+package cases
import (
- "testing"
"time"
- . "github.com/onsi/ginkgo/v2"
- . "github.com/onsi/gomega"
- "github.com/onsi/gomega/gleak"
+ "github.com/onsi/gomega"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
- "github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/test/flags"
- "github.com/apache/skywalking-banyandb/pkg/test/helpers"
- "github.com/apache/skywalking-banyandb/pkg/test/setup"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
- casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
- casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
- casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
- integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
)
-func TestIntegrationQuery(t *testing.T) {
- RegisterFailHandler(Fail)
- RunSpecs(t, "Integration Query Suite",
Label(integration_standalone.Labels...))
-}
-
-var (
- connection *grpc.ClientConn
- now time.Time
- deferFunc func()
- goods []gleak.Goroutine
-)
-
-var _ = SynchronizedBeforeSuite(func() []byte {
- goods = gleak.Goroutines()
- Expect(logger.Init(logger.Logging{
- Env: "dev",
- Level: flags.LogLevel,
- })).To(Succeed())
- var addr string
- addr, _, deferFunc = setup.Common()
- Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials())),
- flags.EventuallyTimeout).Should(Succeed())
+// Initialize test data.
+func Initialize(addr string, now time.Time) {
conn, err := grpchelper.Conn(addr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
- Expect(err).NotTo(HaveOccurred())
- ns := timestamp.NowMilli().UnixNano()
- now = time.Unix(0, ns-ns%int64(time.Minute))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ defer conn.Close()
interval := 500 * time.Millisecond
// stream
casesstreamdata.Write(conn, "data.json", now, interval)
@@ -82,32 +50,4 @@ var _ = SynchronizedBeforeSuite(func() []byte {
casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json",
now.Add(10*time.Second), interval)
casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json",
now.Add(10*time.Minute), interval)
- Expect(conn.Close()).To(Succeed())
- return []byte(addr)
-}, func(address []byte) {
- var err error
- connection, err = grpchelper.Conn(string(address), 10*time.Second,
- grpc.WithTransportCredentials(insecure.NewCredentials()))
- casesstream.SharedContext = helpers.SharedContext{
- Connection: connection,
- BaseTime: now,
- }
- casesmeasure.SharedContext = helpers.SharedContext{
- Connection: connection,
- BaseTime: now,
- }
- casestopn.SharedContext = helpers.SharedContext{
- Connection: connection,
- BaseTime: now,
- }
- Expect(err).NotTo(HaveOccurred())
-})
-
-var _ = SynchronizedAfterSuite(func() {
- if connection != nil {
- Expect(connection.Close()).To(Succeed())
- }
-}, func() {
- deferFunc()
- Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
-})
+}
diff --git a/test/cases/measure/data/input/order_tag_asc.yaml
b/test/cases/measure/data/input/order_tag_asc.yaml
index c5b1f1bd..a752c72b 100644
--- a/test/cases/measure/data/input/order_tag_asc.yaml
+++ b/test/cases/measure/data/input/order_tag_asc.yaml
@@ -22,8 +22,6 @@ tagProjection:
tagFamilies:
- name: "default"
tags: ["id"]
-fieldProjection:
- names: ["total", "value"]
orderBy:
sort: "SORT_ASC"
indexRuleName: "id"
diff --git a/test/cases/measure/data/input/order_tag_desc.yaml
b/test/cases/measure/data/input/order_tag_desc.yaml
index 6031cdb3..c5007d75 100644
--- a/test/cases/measure/data/input/order_tag_desc.yaml
+++ b/test/cases/measure/data/input/order_tag_desc.yaml
@@ -22,8 +22,6 @@ tagProjection:
tagFamilies:
- name: "default"
tags: ["id"]
-fieldProjection:
- names: ["total", "value"]
orderBy:
sort: "SORT_DESC"
indexRuleName: "id"
diff --git a/test/cases/measure/data/want/order_tag_asc.yaml
b/test/cases/measure/data/want/order_tag_asc.yaml
index b6a59101..123e2bda 100644
--- a/test/cases/measure/data/want/order_tag_asc.yaml
+++ b/test/cases/measure/data/want/order_tag_asc.yaml
@@ -16,105 +16,51 @@
# under the License.
dataPoints:
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "1"
- tagFamilies:
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc1
- timestamp: "2023-06-26T12:41:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "2"
- tagFamilies:
+ timestamp: "2023-09-19T08:35:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc1
- timestamp: "2023-06-26T12:42:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "3"
- tagFamilies:
+ timestamp: "2023-09-19T08:37:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc1
- timestamp: "2023-06-26T12:43:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "5"
- tagFamilies:
+ timestamp: "2023-09-19T08:36:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc2
- timestamp: "2023-06-26T12:44:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "50"
- - name: value
- value:
- int:
- value: "4"
- tagFamilies:
+ timestamp: "2023-09-19T08:38:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc2
- timestamp: "2023-06-26T12:45:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "300"
- - name: value
- value:
- int:
- value: "6"
- tagFamilies:
+ timestamp: "2023-09-19T08:39:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc3
- timestamp: "2023-06-26T12:46:00Z"
+ timestamp: "2023-09-19T08:40:00Z"
diff --git a/test/cases/measure/data/want/order_tag_desc.yaml
b/test/cases/measure/data/want/order_tag_desc.yaml
index db711266..6e455083 100644
--- a/test/cases/measure/data/want/order_tag_desc.yaml
+++ b/test/cases/measure/data/want/order_tag_desc.yaml
@@ -16,105 +16,51 @@
# under the License.
dataPoints:
-- fields:
- - name: total
- value:
- int:
- value: "300"
- - name: value
- value:
- int:
- value: "6"
- tagFamilies:
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc3
- timestamp: "2023-06-26T12:47:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "50"
- - name: value
- value:
- int:
- value: "4"
- tagFamilies:
+ timestamp: "2023-09-19T08:42:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc2
- timestamp: "2023-06-26T12:46:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "5"
- tagFamilies:
+ timestamp: "2023-09-19T08:41:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc2
- timestamp: "2023-06-26T12:45:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "3"
- tagFamilies:
+ timestamp: "2023-09-19T08:40:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc1
- timestamp: "2023-06-26T12:44:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "2"
- tagFamilies:
+ timestamp: "2023-09-19T08:38:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc1
- timestamp: "2023-06-26T12:43:00Z"
-- fields:
- - name: total
- value:
- int:
- value: "100"
- - name: value
- value:
- int:
- value: "1"
- tagFamilies:
+ timestamp: "2023-09-19T08:39:00Z"
+- tagFamilies:
- name: default
tags:
- key: id
value:
str:
value: svc1
- timestamp: "2023-06-26T12:42:00Z"
+ timestamp: "2023-09-19T08:37:00Z"
diff --git a/test/integration/standalone/query/query_suite_test.go
b/test/integration/distributed/query/query_suite_test.go
similarity index 54%
copy from test/integration/standalone/query/query_suite_test.go
copy to test/integration/distributed/query/query_suite_test.go
index b9852cbb..0c78a14f 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-package integration_query_test
+// Package integration_setup_test is a integration test suite.
+package integration_setup_test
import (
+ "context"
+ "fmt"
"testing"
"time"
@@ -27,63 +30,82 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
+ test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
+ test_cases "github.com/apache/skywalking-banyandb/test/cases"
casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
- casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
- casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
- integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
)
-func TestIntegrationQuery(t *testing.T) {
+func TestQuery(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "Integration Query Suite",
Label(integration_standalone.Labels...))
+ RunSpecs(t, "Distributed Query Suite")
}
var (
- connection *grpc.ClientConn
- now time.Time
deferFunc func()
goods []gleak.Goroutine
+ now time.Time
+ connection *grpc.ClientConn
)
var _ = SynchronizedBeforeSuite(func() []byte {
- goods = gleak.Goroutines()
Expect(logger.Init(logger.Logging{
Env: "dev",
Level: flags.LogLevel,
})).To(Succeed())
- var addr string
- addr, _, deferFunc = setup.Common()
- Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials())),
- flags.EventuallyTimeout).Should(Succeed())
- conn, err := grpchelper.Conn(addr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ goods = gleak.Goroutines()
+ By("Starting etcd server")
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
Expect(err).NotTo(HaveOccurred())
+ ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir))
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+ By("Loading schema")
+ schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{ep}),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ defer schemaRegistry.Close()
+ ctx := context.Background()
+ test_stream.PreloadSchema(ctx, schemaRegistry)
+ test_measure.PreloadSchema(ctx, schemaRegistry)
+ By("Starting data node 0")
+ closeDataNode0 := setup.DataNode(ep)
+ By("Starting data node 1")
+ closeDataNode1 := setup.DataNode(ep)
+ By("Starting liaison node")
+ liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+ By("Initializing test cases")
ns := timestamp.NowMilli().UnixNano()
now = time.Unix(0, ns-ns%int64(time.Minute))
- interval := 500 * time.Millisecond
- // stream
- casesstreamdata.Write(conn, "data.json", now, interval)
- // measure
- interval = time.Minute
- casesmeasuredata.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric",
"instance_clr_cpu_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second),
interval)
- casesmeasuredata.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute),
interval)
- casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json",
now.Add(10*time.Second), interval)
- casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json",
now.Add(10*time.Minute), interval)
- Expect(conn.Close()).To(Succeed())
- return []byte(addr)
+ test_cases.Initialize(liaisonAddr, now)
+ deferFunc = func() {
+ closerLiaisonNode()
+ closeDataNode0()
+ closeDataNode1()
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+ return []byte(liaisonAddr)
}, func(address []byte) {
var err error
connection, err = grpchelper.Conn(string(address), 10*time.Second,
diff --git a/test/integration/load/load_suite_test.go
b/test/integration/load/load_suite_test.go
index f2dde1a0..7071bc5e 100644
--- a/test/integration/load/load_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -55,7 +55,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Level: flags.LogLevel,
})).To(Succeed())
var addr string
- addr, _, deferFunc = setup.Common()
+ addr, _, deferFunc = setup.Standalone()
Eventually(
helpers.HealthCheck(addr, 10*time.Second, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials())),
flags.EventuallyTimeout).Should(Succeed())
diff --git a/test/integration/standalone/cold_query/query_suite_test.go
b/test/integration/standalone/cold_query/query_suite_test.go
index e595e1a5..18e832ad 100644
--- a/test/integration/standalone/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -33,10 +33,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
+ test_cases "github.com/apache/skywalking-banyandb/test/cases"
casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
- casesmeasureData
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
- casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
)
@@ -60,28 +59,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Level: flags.LogLevel,
})).To(Succeed())
var addr string
- addr, _, deferFunc = setup.Common()
- Eventually(
- helpers.HealthCheck(addr, 10*time.Second, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials())),
- flags.EventuallyTimeout).Should(Succeed())
- conn, err := grpchelper.Conn(addr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
- Expect(err).NotTo(HaveOccurred())
+ addr, _, deferFunc = setup.Standalone()
ns := timestamp.NowMilli().UnixNano()
now = time.Unix(0, ns-ns%int64(time.Minute)).Add(-time.Hour * 24)
- interval := 500 * time.Millisecond
- casesstreamdata.Write(conn, "data.json", now, interval)
- interval = time.Minute
- casesmeasureData.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
- casesmeasureData.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
- casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
- casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric",
"instance_clr_cpu_minute_data.json", now, interval)
- casesmeasureData.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
- casesmeasureData.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second),
interval)
- casesmeasureData.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute),
interval)
- casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
- casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json",
now.Add(10*time.Second), interval)
- casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json",
now.Add(10*time.Minute), interval)
- Expect(conn.Close()).To(Succeed())
+ test_cases.Initialize(addr, now)
return []byte(addr)
}, func(address []byte) {
var err error
diff --git a/test/integration/standalone/other/measure_test.go
b/test/integration/standalone/other/measure_test.go
index 24800b19..a2ab6e81 100644
--- a/test/integration/standalone/other/measure_test.go
+++ b/test/integration/standalone/other/measure_test.go
@@ -43,9 +43,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
g.BeforeEach(func() {
var addr string
- addr, _, deferFn = setup.Common()
- gm.Eventually(helpers.HealthCheck(addr, 10*time.Second,
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
- flags.EventuallyTimeout).Should(gm.Succeed())
+ addr, _, deferFn = setup.Standalone()
var err error
conn, err = grpchelper.Conn(addr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
gm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/integration/standalone/other/property_test.go
b/test/integration/standalone/other/property_test.go
index 9b5ad68c..6ef21a4a 100644
--- a/test/integration/standalone/other/property_test.go
+++ b/test/integration/standalone/other/property_test.go
@@ -33,7 +33,6 @@ import (
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
- "github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
)
@@ -45,9 +44,7 @@ var _ = Describe("Property application", func() {
BeforeEach(func() {
var addr string
- addr, _, deferFn = setup.Common()
- Eventually(helpers.HealthCheck(addr, 10*time.Second,
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
- flags.EventuallyTimeout).Should(Succeed())
+ addr, _, deferFn = setup.Standalone()
var err error
conn, err = grpchelper.Conn(addr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
Expect(err).NotTo(HaveOccurred())
@@ -179,7 +176,7 @@ var _ = Describe("Property application", func() {
BeforeEach(func() {
var addr string
- addr, _, deferFn = setup.Common()
+ addr, _, deferFn = setup.Standalone()
var err error
conn, err = grpchelper.Conn(addr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
Expect(err).NotTo(HaveOccurred())
diff --git a/test/integration/standalone/other/tls_test.go
b/test/integration/standalone/other/tls_test.go
index acda8f30..8d4d5df1 100644
--- a/test/integration/standalone/other/tls_test.go
+++ b/test/integration/standalone/other/tls_test.go
@@ -49,11 +49,10 @@ var _ = g.Describe("Query service_cpm_minute", func() {
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
keyFile := filepath.Join(basePath, "testdata/server_key.pem")
var addr string
- addr, _, deferFn = setup.Common("--tls=true",
"--cert-file="+certFile, "--key-file="+keyFile)
+ addr, _, deferFn = setup.StandaloneWithTLS(certFile, keyFile)
var err error
creds, err := credentials.NewClientTLSFromFile(certFile,
"localhost")
gm.Expect(err).NotTo(gm.HaveOccurred())
- gm.Eventually(helpers.HealthCheck(addr, 10*time.Second,
10*time.Second, grpclib.WithTransportCredentials(creds)),
flags.EventuallyTimeout).Should(gm.Succeed())
conn, err = grpchelper.Conn(addr, 10*time.Second,
grpclib.WithTransportCredentials(creds))
gm.Expect(err).NotTo(gm.HaveOccurred())
ns := timestamp.NowMilli().UnixNano()
diff --git a/test/integration/standalone/query/query_suite_test.go
b/test/integration/standalone/query/query_suite_test.go
index b9852cbb..a1e684b2 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -33,10 +33,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
+ test_cases "github.com/apache/skywalking-banyandb/test/cases"
casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
- casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
- casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
)
@@ -60,29 +59,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Level: flags.LogLevel,
})).To(Succeed())
var addr string
- addr, _, deferFunc = setup.Common()
- Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials())),
- flags.EventuallyTimeout).Should(Succeed())
- conn, err := grpchelper.Conn(addr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
- Expect(err).NotTo(HaveOccurred())
+ addr, _, deferFunc = setup.Standalone()
ns := timestamp.NowMilli().UnixNano()
now = time.Unix(0, ns-ns%int64(time.Minute))
- interval := 500 * time.Millisecond
- // stream
- casesstreamdata.Write(conn, "data.json", now, interval)
- // measure
- interval = time.Minute
- casesmeasuredata.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric",
"instance_clr_cpu_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second),
interval)
- casesmeasuredata.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute),
interval)
- casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
- casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json",
now.Add(10*time.Second), interval)
- casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute",
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json",
now.Add(10*time.Minute), interval)
- Expect(conn.Close()).To(Succeed())
+ test_cases.Initialize(addr, now)
return []byte(addr)
}, func(address []byte) {
var err error
diff --git a/test/stress/cases/istio/istio_suite_test.go
b/test/stress/cases/istio/istio_suite_test.go
index a2a2a5b6..bab730bc 100644
--- a/test/stress/cases/istio/istio_suite_test.go
+++ b/test/stress/cases/istio/istio_suite_test.go
@@ -57,7 +57,7 @@ var _ = Describe("Istio", func() {
})).To(Succeed())
})
It("should pass", func() {
- addr, _, deferFunc :=
setup.CommonWithSchemaLoaders([]setup.SchemaLoader{&preloadService{name:
"oap"}})
+ addr, _, deferFunc :=
setup.StandaloneWithSchemaLoaders([]setup.SchemaLoader{&preloadService{name:
"oap"}}, "", "")
DeferCleanup(deferFunc)
Eventually(helpers.HealthCheck(addr, 10*time.Second,
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
flags.EventuallyTimeout).Should(Succeed())
diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go
index 6865ba60..89ac54aa 100644
--- a/test/stress/cases/istio/repo.go
+++ b/test/stress/cases/istio/repo.go
@@ -39,7 +39,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
)
//go:embed testdata/*
@@ -131,8 +131,8 @@ func extractTarGz(src []byte, dest string) (string, error) {
}
type preloadService struct {
- metaSvc metadata.Service
- name string
+ registry schema.Registry
+ name string
}
func (p *preloadService) Name() string {
@@ -140,7 +140,7 @@ func (p *preloadService) Name() string {
}
func (p *preloadService) PreRun(_ context.Context) error {
- e := p.metaSvc.SchemaRegistry()
+ e := p.registry
if err := loadSchema(groupDir, &commonv1.Group{}, func(group
*commonv1.Group) error {
return e.CreateGroup(context.TODO(), group)
}); err != nil {
@@ -165,8 +165,8 @@ func (p *preloadService) PreRun(_ context.Context) error {
return nil
}
-func (p *preloadService) SetMeta(meta metadata.Service) {
- p.metaSvc = meta
+func (p *preloadService) SetRegistry(registry schema.Registry) {
+ p.registry = registry
}
const (