This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0fef03b0c924347c52caef7d4459f3014219bdd0 Author: Gao Hongtao <[email protected]> AuthorDate: Tue Sep 19 11:35:03 2023 +0000 Apply query test cases to a cluster Signed-off-by: Gao Hongtao <[email protected]> --- api/data/data.go | 68 ++++++++++++++++++ api/data/doc.go | 19 ----- banyand/dquery/dquery.go | 14 +++- banyand/dquery/measure.go | 2 +- banyand/dquery/stream.go | 2 +- banyand/dquery/topn.go | 53 ++++++++------ banyand/liaison/grpc/discovery.go | 5 +- banyand/liaison/grpc/measure.go | 15 ++-- banyand/liaison/grpc/registry_test.go | 2 +- banyand/liaison/grpc/server.go | 14 ++-- banyand/liaison/grpc/stream.go | 9 ++- banyand/measure/measure.go | 5 +- banyand/measure/metadata.go | 13 ++++ banyand/query/processor.go | 4 +- banyand/query/processor_topn.go | 25 +++---- banyand/queue/pub/pub.go | 37 +++++++--- banyand/queue/sub/sub.go | 39 ++++++++-- banyand/stream/metadata.go | 12 ++++ banyand/stream/stream.go | 3 + banyand/tsdb/buffer.go | 2 +- pkg/bus/bus.go | 5 -- pkg/cmdsetup/data.go | 8 --- pkg/cmdsetup/liaison.go | 7 +- pkg/cmdsetup/standalone.go | 2 +- .../logical/measure/measure_plan_distributed.go | 84 +++++++++++++++------- .../logical/stream/stream_plan_distributed.go | 54 +++++++++++--- pkg/schema/metadata.go | 52 ++++++++------ pkg/schema/schema.go | 2 +- pkg/test/setup/setup.go | 52 ++++++++++++++ .../query/query_suite_test.go => cases/init.go} | 74 +++---------------- test/cases/measure/data/input/order_tag_asc.yaml | 2 - test/cases/measure/data/input/order_tag_desc.yaml | 2 - test/cases/measure/data/want/order_tag_asc.yaml | 78 ++++---------------- test/cases/measure/data/want/order_tag_desc.yaml | 78 ++++---------------- .../query/query_suite_test.go | 82 +++++++++++++-------- .../standalone/cold_query/query_suite_test.go | 20 +----- .../standalone/query/query_suite_test.go | 22 +----- 37 files changed, 524 insertions(+), 443 deletions(-) diff --git a/api/data/data.go b/api/data/data.go new file mode 100644 index 00000000..5da103ef --- /dev/null +++ b/api/data/data.go @@ -0,0 +1,68 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package data contains data transmission topics. +package data + +import ( + "google.golang.org/protobuf/proto" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/pkg/bus" +) + +// TopicMap is the map of topic name to topic. +var TopicMap = map[string]bus.Topic{ + TopicStreamWrite.String(): TopicStreamWrite, + TopicStreamQuery.String(): TopicStreamQuery, + TopicMeasureWrite.String(): TopicMeasureWrite, + TopicMeasureQuery.String(): TopicMeasureQuery, + TopicTopNQuery.String(): TopicTopNQuery, +} + +// TopicRequestMap is the map of topic name to request message. +var TopicRequestMap = map[bus.Topic]func() proto.Message{ + TopicStreamWrite: func() proto.Message { + return &streamv1.InternalWriteRequest{} + }, + TopicStreamQuery: func() proto.Message { + return &streamv1.QueryRequest{} + }, + TopicMeasureWrite: func() proto.Message { + return &measurev1.InternalWriteRequest{} + }, + TopicMeasureQuery: func() proto.Message { + return &measurev1.QueryRequest{} + }, + TopicTopNQuery: func() proto.Message { + return &measurev1.TopNRequest{} + }, +} + +// TopicResponseMap is the map of topic name to response message. +var TopicResponseMap = map[bus.Topic]func() proto.Message{ + TopicStreamQuery: func() proto.Message { + return &streamv1.QueryResponse{} + }, + TopicMeasureQuery: func() proto.Message { + return &measurev1.QueryResponse{} + }, + TopicTopNQuery: func() proto.Message { + return &measurev1.TopNResponse{} + }, +} diff --git a/api/data/doc.go b/api/data/doc.go deleted file mode 100644 index 25918677..00000000 --- a/api/data/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package data contains data transmission topics. -package data diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go index 4d77b6fb..c7e1454d 100644 --- a/banyand/dquery/dquery.go +++ b/banyand/dquery/dquery.go @@ -21,9 +21,13 @@ package dquery import ( "context" + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/api/data" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -44,14 +48,16 @@ type queryService struct { mqp *measureQueryProcessor tqp *topNQueryProcessor closer *run.Closer + pipeline queue.Server } // NewService return a new query service. -func NewService(metaService metadata.Repo, broadcaster bus.Broadcaster, +func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster bus.Broadcaster, ) (run.Unit, error) { svc := &queryService{ metaService: metaService, closer: run.NewCloser(1), + pipeline: pipeline, } svc.sqp = &streamQueryProcessor{ queryService: svc, @@ -76,7 +82,11 @@ func (q *queryService) PreRun(_ context.Context) error { q.log = logger.GetLogger(moduleName) q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log) q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log) - return nil + return multierr.Combine( + q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), + q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp), + q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp), + ) } func (q *queryService) GracefulStop() { diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go index b145de94..9dc95d82 100644 --- a/banyand/dquery/measure.go +++ b/banyand/dquery/measure.go @@ -95,6 +95,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if e := ml.Debug(); e.Enabled() { e.RawJSON("ret", logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure") } - resp = bus.NewMessage(bus.MessageID(now), result) + resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{DataPoints: result}) return } diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go index 62804d00..6fd129d0 100644 --- a/banyand/dquery/stream.go +++ b/banyand/dquery/stream.go @@ -78,7 +78,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } - resp = bus.NewMessage(bus.MessageID(now), entities) + resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Elements: entities}) return } diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go index df34d337..a885f829 100644 --- a/banyand/dquery/topn.go +++ b/banyand/dquery/topn.go @@ -19,12 +19,13 @@ package dquery import ( "go.uber.org/multierr" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/query" + "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/iter/sort" @@ -48,6 +49,8 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if e := t.log.Debug(); e.Enabled() { e.Stringer("req", request).Msg("received a topN query event") } + agg := request.Agg + request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED now := bus.MessageID(request.TimeRange.Begin.Nanos) ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, bus.NewMessage(now, request)) if err != nil { @@ -55,35 +58,41 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } var allErr error - var sii []sort.Iterator[*comparableTopNItem] - var latestTimestamp *timestamppb.Timestamp + aggregator := query.CreateTopNPostAggregator(request.GetTopN(), + agg, request.GetFieldValueSort()) + var tags []string for _, f := range ff { if m, getErr := f.Get(); getErr != nil { allErr = multierr.Append(allErr, getErr) } else { - tl := m.Data().(*measurev1.TopNList) - un := tl.Timestamp.AsTime().UnixNano() - if un > latestTimestamp.AsTime().UnixNano() { - latestTimestamp = tl.Timestamp + d := m.Data() + if d == nil { + continue + } + topNResp := d.(*measurev1.TopNResponse) + for _, l := range topNResp.Lists { + for _, tn := range l.Items { + if tags == nil { + tags = make([]string, 0, len(tn.Entity)) + for _, e := range tn.Entity { + tags = append(tags, e.Key) + } + } + entityValues := make(tsdb.EntityValues, 0, len(tn.Entity)) + for _, e := range tn.Entity { + entityValues = append(entityValues, e.Value) + } + _ = aggregator.Put(entityValues, tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli())) + } } - sii = append(sii, &sortedTopNList{TopNList: tl}) } } - var desc bool - if request.GetFieldValueSort() == modelv1.Sort_SORT_DESC { - desc = true - } - iter := sort.NewItemIter[*comparableTopNItem](sii, desc) - defer func() { - _ = iter.Close() - }() - var items []*measurev1.TopNList_Item - for iter.Next() && len(items) < int(request.TopN) { - items = append(items, iter.Val().TopNList_Item) + if tags == nil { + resp = bus.NewMessage(now, &measurev1.TopNResponse{}) + return } - resp = bus.NewMessage(now, &measurev1.TopNList{ - Items: items, - Timestamp: latestTimestamp, + resp = bus.NewMessage(now, &measurev1.TopNResponse{ + Lists: aggregator.Val(tags), }) return } diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index b133d21d..57914fec 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -29,7 +29,6 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" - "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" @@ -38,7 +37,6 @@ import ( var errNotExist = errors.New("the object doesn't exist") type discoveryService struct { - pipeline queue.Client metadataRepo metadata.Repo nodeRegistry NodeRegistry shardRepo *shardRepo @@ -47,13 +45,12 @@ type discoveryService struct { kind schema.Kind } -func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry) *discoveryService { +func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry) *discoveryService { sr := &shardRepo{shardEventsMap: make(map[identity]uint32)} er := &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)} return &discoveryService{ shardRepo: sr, entityRepo: er, - pipeline: pipeline, kind: kind, metadataRepo: metadataRepo, nodeRegistry: nodeRegistry, diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 4b3f7da9..2a32d29d 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -31,6 +31,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -43,6 +44,8 @@ type measureService struct { *discoveryService sampled *logger.Logger ingestionAccessLog accesslog.Log + pipeline queue.Client + broadcaster queue.Client } func (ms *measureService) setLogger(log *logger.Logger) { @@ -137,7 +140,7 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req) - feat, errQuery := ms.pipeline.Publish(data.TopicMeasureQuery, message) + feat, errQuery := ms.broadcaster.Publish(data.TopicMeasureQuery, message) if errQuery != nil { return nil, errQuery } @@ -150,8 +153,8 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) } data := msg.Data() switch d := data.(type) { - case []*measurev1.DataPoint: - return &measurev1.QueryResponse{DataPoints: d}, nil + case *measurev1.QueryResponse: + return d, nil case common.Error: return nil, errors.WithMessage(errQueryMsg, d.Msg()) } @@ -164,7 +167,7 @@ func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNReq } message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), topNRequest) - feat, errQuery := ms.pipeline.Publish(data.TopicTopNQuery, message) + feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message) if errQuery != nil { return nil, errQuery } @@ -174,8 +177,8 @@ func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNReq } data := msg.Data() switch d := data.(type) { - case []*measurev1.TopNList: - return &measurev1.TopNResponse{Lists: d}, nil + case *measurev1.TopNResponse: + return d, nil case common.Error: return nil, errors.WithMessage(errQueryMsg, d.Msg()) } diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go index 6dff21fa..d7118b40 100644 --- a/banyand/liaison/grpc/registry_test.go +++ b/banyand/liaison/grpc/registry_test.go @@ -178,7 +178,7 @@ func setupForRegistry() func() { metaSvc, err := metadata.NewService(context.TODO()) Expect(err).NotTo(HaveOccurred()) - tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, grpc.NewLocalNodeRegistry()) + tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, metaSvc, grpc.NewLocalNodeRegistry()) preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc} var flags []string metaPath, metaDeferFunc, err := test.NewSpace() diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index a89b7004..102bd170 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -64,8 +64,7 @@ type Server interface { } type server struct { - pipeline queue.Client - creds credentials.TransportCredentials + creds credentials.TransportCredentials *streamRegistryServer log *logger.Logger *indexRuleBindingRegistryServer @@ -91,15 +90,18 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server { +func NewServer(_ context.Context, pipeline, broadcaster queue.Client, schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server { streamSVC := &streamService{ - discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry, nodeRegistry), + discoveryService: newDiscoveryService(schema.KindStream, schemaRegistry, nodeRegistry), + pipeline: pipeline, + broadcaster: broadcaster, } measureSVC := &measureService{ - discoveryService: newDiscoveryService(pipeline, schema.KindMeasure, schemaRegistry, nodeRegistry), + discoveryService: newDiscoveryService(schema.KindMeasure, schemaRegistry, nodeRegistry), + pipeline: pipeline, + broadcaster: broadcaster, } s := &server{ - pipeline: pipeline, streamSVC: streamSVC, measureSVC: measureSVC, streamRegistryServer: &streamRegistryServer{ diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 0ae2b30d..aa3d49a4 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -31,6 +31,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -43,6 +44,8 @@ type streamService struct { *discoveryService sampled *logger.Logger ingestionAccessLog accesslog.Log + pipeline queue.Client + broadcaster queue.Client } func (s *streamService) setLogger(log *logger.Logger) { @@ -142,7 +145,7 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (*s return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req) - feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message) + feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message) if errQuery != nil { if errors.Is(errQuery, io.EOF) { return emptyStreamQueryResponse, nil @@ -155,8 +158,8 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (*s } data := msg.Data() switch d := data.(type) { - case []*streamv1.Element: - return &streamv1.QueryResponse{Elements: d}, nil + case *streamv1.QueryResponse: + return d, nil case common.Error: return nil, errors.WithMessage(errQueryMsg, d.Msg()) } diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index b8a55140..e5f28956 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -114,9 +114,12 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger. if err := m.parseSpec(); err != nil { return nil, err } - ctx := context.WithValue(context.Background(), logger.ContextKey, l) + if db == nil { + return m, nil + } m.databaseSupplier = db + ctx := context.WithValue(context.Background(), logger.ContextKey, l) m.indexWriter = index.NewWriter(ctx, index.WriterOptions{ DB: db, ShardNum: shardNum, diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 053abee3..27dfa090 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -405,3 +405,16 @@ func (s *portableSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema defer cancel() return s.metadata.MeasureRegistry().GetMeasure(ctx, md) } + +func (*portableSupplier) OpenDB(_ *commonv1.Group) (tsdb.Database, error) { + panic("do not support open db") +} + +func (s *portableSupplier) OpenResource(shardNum uint32, _ tsdb.Supplier, spec resourceSchema.Resource) (io.Closer, error) { + measureSchema := spec.Schema().(*databasev1.Measure) + return openMeasure(shardNum, nil, measureSpec{ + schema: measureSchema, + indexRules: spec.IndexRules(), + topNAggregations: spec.TopN(), + }, s.l, nil) +} diff --git a/banyand/query/processor.go b/banyand/query/processor.go index a47e6df8..70a89d8e 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -104,7 +104,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } - resp = bus.NewMessage(bus.MessageID(now), entities) + resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Elements: entities}) return } @@ -170,7 +170,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if e := ml.Debug(); e.Enabled() { e.RawJSON("ret", logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure") } - resp = bus.NewMessage(bus.MessageID(now), result) + resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{DataPoints: result}) return } diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go index 55939991..1f266c7e 100644 --- a/banyand/query/processor_topn.go +++ b/banyand/query/processor_topn.go @@ -88,7 +88,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { Msg("fail to list shards") return } - aggregator := createTopNPostAggregator(request.GetTopN(), + aggregator := CreateTopNPostAggregator(request.GetTopN(), request.GetAgg(), request.GetFieldValueSort()) entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), request.GetConditions()) if err != nil { @@ -130,7 +130,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { Msg("fail to parse topN family") return } - _ = aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), iter.Val().Time()) + _ = aggregator.Put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), iter.Val().Time()) } _ = iter.Close() } @@ -138,7 +138,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { } now := time.Now().UnixNano() - resp = bus.NewMessage(bus.MessageID(now), aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames())) + resp = bus.NewMessage(bus.MessageID(now), &measurev1.TopNResponse{Lists: aggregator.Val(sourceMeasure.GetSchema().GetEntity().GetTagNames())}) return } @@ -254,13 +254,14 @@ func (n *aggregatorItem) GetTags(tagNames []string) []*modelv1.Tag { return tags } -// postProcessor defines necessary methods for Top-N post processor with or without aggregation. -type postProcessor interface { - put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error - val([]string) []*measurev1.TopNList +// PostProcessor defines necessary methods for Top-N post processor with or without aggregation. +type PostProcessor interface { + Put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error + Val([]string) []*measurev1.TopNList } -func createTopNPostAggregator(topN int32, aggrFunc modelv1.AggregationFunction, sort modelv1.Sort) postProcessor { +// CreateTopNPostAggregator creates a Top-N post processor with or without aggregation. +func CreateTopNPostAggregator(topN int32, aggrFunc modelv1.AggregationFunction, sort modelv1.Sort) PostProcessor { if aggrFunc == modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED { // if aggregation is not specified, we have to keep all timelines return &postNonAggregationProcessor{ @@ -327,7 +328,7 @@ func (aggr *postAggregationProcessor) Pop() any { return item } -func (aggr *postAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error { +func (aggr *postAggregationProcessor) Put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error { // update latest ts if aggr.latestTimestamp < timestampMillis { aggr.latestTimestamp = timestampMillis @@ -374,7 +375,7 @@ func (aggr *postAggregationProcessor) tryEnqueue(key string, item *aggregatorIte } } -func (aggr *postAggregationProcessor) val(tagNames []string) []*measurev1.TopNList { +func (aggr *postAggregationProcessor) Val(tagNames []string) []*measurev1.TopNList { topNItems := make([]*measurev1.TopNList_Item, aggr.Len()) for aggr.Len() > 0 { @@ -430,7 +431,7 @@ type postNonAggregationProcessor struct { sort modelv1.Sort } -func (naggr *postNonAggregationProcessor) val(tagNames []string) []*measurev1.TopNList { +func (naggr *postNonAggregationProcessor) Val(tagNames []string) []*measurev1.TopNList { topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines)) for ts, timeline := range naggr.timelines { items := make([]*measurev1.TopNList_Item, timeline.Len()) @@ -462,7 +463,7 @@ func (naggr *postNonAggregationProcessor) val(tagNames []string) []*measurev1.To return topNLists } -func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error { +func (naggr *postNonAggregationProcessor) Put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error { key := entityValues.String() if timeline, ok := naggr.timelines[timestampMillis]; ok { if timeline.Len() < int(naggr.topN) { diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 9e00b13a..51d56967 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -30,6 +30,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "github.com/apache/skywalking-banyandb/api/data" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" @@ -105,9 +106,7 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err if !ok { return multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - stream, errCreateStream := client.client.Send(ctx) + stream, errCreateStream := client.client.Send(context.Background()) if err != nil { return multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) } @@ -116,6 +115,7 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err return multierr.Append(err, fmt.Errorf("failed to send message to node %s: %w", node, errSend)) } f.clients = append(f.clients, stream) + f.topics = append(f.topics, topic) return err } for _, m := range messages { @@ -133,6 +133,7 @@ func (p *pub) NewBatchPublisher() queue.BatchPublisher { func New(metadata metadata.Repo) queue.Client { return &pub{ metadata: metadata, + clients: make(map[string]*client), closer: run.NewCloser(1), } } @@ -174,7 +175,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus } node := m.Node() sendData := func() (success bool) { - if stream, ok := bp.streams[node]; !ok { + if stream, ok := bp.streams[node]; ok { defer func() { if !success { delete(bp.streams, node) @@ -225,9 +226,6 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus } func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, error) { - if !m.IsRemote() { - return nil, fmt.Errorf("message %d is not remote", m.ID()) - } r := &clusterv1.SendRequest{ Topic: topic.String(), MessageId: uint64(m.ID()), @@ -246,6 +244,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e type future struct { clients []clusterv1.Service_SendClient + topics []bus.Topic } func (l *future) Get() (bus.Message, error) { @@ -253,17 +252,33 @@ func (l *future) Get() (bus.Message, error) { return bus.Message{}, io.EOF } c := l.clients[0] + t := l.topics[0] defer func() { l.clients = l.clients[1:] + l.topics = l.topics[1:] }() resp, err := c.Recv() if err != nil { return bus.Message{}, err } - return bus.NewMessage( - bus.MessageID(resp.MessageId), - resp.Body, - ), nil + if resp.Error != "" { + return bus.Message{}, errors.New(resp.Error) + } + if resp.Body == nil { + return bus.NewMessage(bus.MessageID(resp.MessageId), nil), nil + } + if messageSupplier, ok := data.TopicResponseMap[t]; ok { + m := messageSupplier() + err = resp.Body.UnmarshalTo(m) + if err != nil { + return bus.Message{}, err + } + return bus.NewMessage( + bus.MessageID(resp.MessageId), + m, + ), nil + } + return bus.Message{}, fmt.Errorf("invalid topic %s", t) } func (l *future) GetAll() ([]bus.Message, error) { diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go index c689eef0..021ec877 100644 --- a/banyand/queue/sub/sub.go +++ b/banyand/queue/sub/sub.go @@ -22,9 +22,12 @@ import ( "io" "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "github.com/apache/skywalking-banyandb/api/data" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" "github.com/apache/skywalking-banyandb/pkg/bus" ) @@ -52,11 +55,18 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { return nil } if err != nil { - reply(writeEntity, err, "failed to receive message") - continue + if status.Code(err) == codes.Canceled { + return nil + } + s.log.Error().Err(err).Msg("failed to receive message") + return err } if writeEntity.Topic != "" && topic == nil { - t := bus.UniTopic(writeEntity.Topic) + t, ok := data.TopicMap[writeEntity.Topic] + if !ok { + reply(writeEntity, err, "invalid topic") + continue + } topic = &t } if topic == nil { @@ -68,9 +78,25 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { reply(writeEntity, err, "no listener found") continue } - m := listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), writeEntity.Body)) + var m bus.Message + if reqSupplier, ok := data.TopicRequestMap[*topic]; ok { + req := reqSupplier() + if errUnmarshal := writeEntity.Body.UnmarshalTo(req); errUnmarshal != nil { + reply(writeEntity, errUnmarshal, "failed to unmarshal message") + continue + } + m = listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), req)) + } else { + reply(writeEntity, err, "unknown topic") + continue + } + if m.Data() == nil { - reply(writeEntity, err, "no response") + if errSend := stream.Send(&clusterv1.SendResponse{ + MessageId: writeEntity.MessageId, + }); errSend != nil { + s.log.Error().Stringer("written", writeEntity).Err(errSend).Msg("failed to send response") + } continue } message, ok := m.Data().(proto.Message) @@ -87,8 +113,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { MessageId: writeEntity.MessageId, Body: anyMessage, }); err != nil { - reply(writeEntity, err, "failed to send response") - continue + s.log.Error().Stringer("written", writeEntity).Err(err).Msg("failed to send response") } } } diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 0ce3e6ce..18d5c3a1 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -271,6 +271,18 @@ type portableSupplier struct { l *logger.Logger } +func (*portableSupplier) OpenDB(_ *commonv1.Group) (tsdb.Database, error) { + panic("do not support open db") +} + +func (s *portableSupplier) OpenResource(shardNum uint32, _ tsdb.Supplier, resource resourceSchema.Resource) (io.Closer, error) { + streamSchema := resource.Schema().(*databasev1.Stream) + return openStream(shardNum, nil, streamSpec{ + schema: streamSchema, + indexRules: resource.IndexRules(), + }, s.l), nil +} + func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) *portableSupplier { return &portableSupplier{ metadata: metadata, diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index a989ebb2..4d5c8f8a 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -73,6 +73,9 @@ func openStream(shardNum uint32, db tsdb.Supplier, spec streamSpec, l *logger.Lo sm.parseSpec() ctx := context.WithValue(context.Background(), logger.ContextKey, l) + if db == nil { + return sm + } sm.db = db sm.indexWriter = index.NewWriter(ctx, index.WriterOptions{ DB: db, diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go index 1fc94283..2d8764fd 100644 --- a/banyand/tsdb/buffer.go +++ b/banyand/tsdb/buffer.go @@ -385,7 +385,6 @@ func (bsb *bufferShardBucket) recoveryWal() error { func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) { var wg sync.WaitGroup - wg.Add(len(segment.GetLogEntries())) for _, logEntry := range segment.GetLogEntries() { timestamps := logEntry.GetTimestamps() values := logEntry.GetValues() @@ -404,6 +403,7 @@ func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) { } }, } + wg.Add(1) elementIndex++ } } diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 6ad65ada..5df38130 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -64,11 +64,6 @@ func (m Message) Node() string { return m.node } -// IsRemote returns true if the Message is sent from a remote node. -func (m Message) IsRemote() bool { - return m.node != "local" -} - // NewMessage returns a new Message with a MessageID and embed data. func NewMessage(id MessageID, data interface{}) Message { return Message{id: id, node: "local", payload: data} diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go index 7a421b12..a6ec86a6 100644 --- a/pkg/cmdsetup/data.go +++ b/pkg/cmdsetup/data.go @@ -30,7 +30,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue/sub" "github.com/apache/skywalking-banyandb/banyand/stream" - "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/version" @@ -75,17 +74,10 @@ func newDataCmd(runners ...run.Unit) *cobra.Command { } dataGroup := run.NewGroup("data") dataGroup.Register(units...) - logging := logger.Logging{} dataCmd := &cobra.Command{ Use: "data", Version: version.Build(), Short: "Run as the data server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { - return err - } - return logger.Init(logging) - }, RunE: func(cmd *cobra.Command, args []string) (err error) { node, err := common.GenerateNode(pipeline.GetPort(), nil) if err != nil { diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index e1178ef3..126f4b89 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -29,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/queue/pub" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" @@ -43,11 +44,12 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { l.Fatal().Err(err).Msg("failed to initiate metadata service") } pipeline := pub.New(metaSvc) - grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, grpc.NewClusterNodeRegistry(pipeline)) + localPipeline := queue.Local() + grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, grpc.NewClusterNodeRegistry(pipeline)) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() httpServer := http.NewServer() - dQuery, err := dquery.NewService(metaSvc, pipeline) + dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate distributed query service") } @@ -55,6 +57,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { units = append(units, runners...) units = append(units, metaSvc, + localPipeline, pipeline, dQuery, grpcServer, diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index 3c659b09..c2638b8e 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -57,7 +57,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } - grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, grpc.NewLocalNodeRegistry()) + grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc, grpc.NewLocalNodeRegistry()) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() httpServer := http.NewServer() diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 45ee4c66..bbe4d0d4 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -38,9 +38,7 @@ import ( var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) type unresolvedDistributed struct { - originalQuery *measurev1.QueryRequest - order *logical.OrderBy - maxDataPointsSize int + originalQuery *measurev1.QueryRequest } func newUnresolvedDistributed(query *measurev1.QueryRequest) logical.UnresolvedPlan { @@ -49,39 +47,61 @@ func newUnresolvedDistributed(query *measurev1.QueryRequest) logical.UnresolvedP } } -func (ud *unresolvedDistributed) Limit(max int) { - ud.maxDataPointsSize = max -} - -func (ud *unresolvedDistributed) Sort(order *logical.OrderBy) { - ud.order = order -} - func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) { if ud.originalQuery.TagProjection == nil { return nil, fmt.Errorf("tag projection is required") } - if ud.originalQuery.FieldProjection == nil { - return nil, fmt.Errorf("filed projection is required") + projectionTags := logical.ToTags(ud.originalQuery.GetTagProjection()) + if len(projectionTags) > 0 { + var err error + projTagsRefs, err := s.CreateTagRef(projectionTags...) + if err != nil { + return nil, err + } + s = s.ProjTags(projTagsRefs...) + } + projectionFields := make([]*logical.Field, len(ud.originalQuery.GetFieldProjection().GetNames())) + for i, fieldNameProj := range ud.originalQuery.GetFieldProjection().GetNames() { + projectionFields[i] = logical.NewField(fieldNameProj) + } + if len(projectionFields) > 0 { + var err error + projFieldRefs, err := s.CreateFieldRef(projectionFields...) + if err != nil { + return nil, err + } + s = s.ProjFields(projFieldRefs...) + } + limit := ud.originalQuery.GetLimit() + if limit == 0 { + limit = defaultLimit } temp := &measurev1.QueryRequest{ TagProjection: ud.originalQuery.TagProjection, FieldProjection: ud.originalQuery.FieldProjection, Metadata: ud.originalQuery.Metadata, Criteria: ud.originalQuery.Criteria, - Limit: uint32(ud.maxDataPointsSize), - OrderBy: &modelv1.QueryOrder{ - IndexRuleName: ud.order.Index.Metadata.Name, - Sort: ud.order.Sort, - }, + Limit: limit, + OrderBy: ud.originalQuery.OrderBy, } - if ud.order == nil { + if ud.originalQuery.OrderBy == nil { return &distributedPlan{ queryTemplate: temp, s: s, sortByTime: true, }, nil } + if ud.originalQuery.OrderBy.IndexRuleName == "" { + result := &distributedPlan{ + queryTemplate: temp, + s: s, + sortByTime: true, + } + if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { + result.desc = true + } + return result, nil + } ok, indexRule := s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName) if !ok { return nil, fmt.Errorf("index rule %s not found", ud.originalQuery.OrderBy.IndexRuleName) @@ -106,17 +126,21 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) } type distributedPlan struct { - s logical.Schema - queryTemplate *measurev1.QueryRequest - sortTagSpec logical.TagSpec - sortByTime bool - desc bool + s logical.Schema + queryTemplate *measurev1.QueryRequest + sortTagSpec logical.TagSpec + sortByTime bool + desc bool + maxDataPointsSize uint32 } func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, error) { dctx := executor.FromDistributedExecutionContext(ctx) query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest) query.TimeRange = dctx.TimeRange() + if t.maxDataPointsSize > 0 { + query.Limit = t.maxDataPointsSize + } ff, err := dctx.Broadcast(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) if err != nil { return nil, err @@ -127,8 +151,12 @@ func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, erro if m, getErr := f.Get(); getErr != nil { allErr = multierr.Append(allErr, getErr) } else { + d := m.Data() + if d == nil { + continue + } see = append(see, - newSortableElements(m.Data().(*measurev1.QueryResponse).DataPoints, + newSortableElements(d.(*measurev1.QueryResponse).DataPoints, t.sortByTime, t.sortTagSpec)) } } @@ -149,6 +177,10 @@ func (t *distributedPlan) Schema() logical.Schema { return t.s } +func (t *distributedPlan) Limit(max int) { + t.maxDataPointsSize = uint32(max) +} + var _ sort.Comparable = (*comparableDataPoint)(nil) type comparableDataPoint struct { @@ -220,7 +252,7 @@ func (s *sortableElements) iter(fn func(*measurev1.DataPoint) (*comparableDataPo return s.iter(fn) } s.cur = cur - return s.index < len(s.dataPoints) + return s.index <= len(s.dataPoints) } var _ executor.MIterator = (*sortedMIterator)(nil) diff --git a/pkg/query/logical/stream/stream_plan_distributed.go b/pkg/query/logical/stream/stream_plan_distributed.go index 7fce94cc..4e111ad8 100644 --- a/pkg/query/logical/stream/stream_plan_distributed.go +++ b/pkg/query/logical/stream/stream_plan_distributed.go @@ -51,6 +51,15 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) if ud.originalQuery.Projection == nil { return nil, fmt.Errorf("projection is required") } + projectionTags := logical.ToTags(ud.originalQuery.GetProjection()) + if len(projectionTags) > 0 { + var err error + projTagsRefs, err := s.CreateTagRef(projectionTags...) + if err != nil { + return nil, err + } + s = s.ProjTags(projTagsRefs...) + } limit := ud.originalQuery.GetLimit() if limit == 0 { limit = defaultLimit @@ -69,6 +78,17 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) sortByTime: true, }, nil } + if ud.originalQuery.OrderBy.IndexRuleName == "" { + result := &distributedPlan{ + queryTemplate: temp, + s: s, + sortByTime: true, + } + if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { + result.desc = true + } + return result, nil + } ok, indexRule := s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName) if !ok { return nil, fmt.Errorf("index rule %s not found", ud.originalQuery.OrderBy.IndexRuleName) @@ -93,18 +113,22 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) } type distributedPlan struct { - s logical.Schema - queryTemplate *streamv1.QueryRequest - sortTagSpec logical.TagSpec - sortByTime bool - desc bool + s logical.Schema + queryTemplate *streamv1.QueryRequest + sortTagSpec logical.TagSpec + sortByTime bool + desc bool + maxElementSize uint32 } func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, error) { dctx := executor.FromDistributedExecutionContext(ctx) query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest) query.TimeRange = dctx.TimeRange() - ff, err := dctx.Broadcast(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + if t.maxElementSize > 0 { + query.Limit = t.maxElementSize + } + ff, err := dctx.Broadcast(data.TopicStreamQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) if err != nil { return nil, err } @@ -114,9 +138,17 @@ func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, err if m, getErr := f.Get(); getErr != nil { allErr = multierr.Append(allErr, getErr) } else { + d := m.Data() + if d == nil { + continue + } + resp := d.(*streamv1.QueryResponse) + if err != nil { + allErr = multierr.Append(allErr, err) + continue + } see = append(see, - newSortableElements(m.Data().(*streamv1.QueryResponse).Elements, - t.sortByTime, t.sortTagSpec)) + newSortableElements(resp.Elements, t.sortByTime, t.sortTagSpec)) } } iter := sort.NewItemIter[*comparableElement](see, t.desc) @@ -139,6 +171,10 @@ func (t *distributedPlan) Schema() logical.Schema { return t.s } +func (t *distributedPlan) Limit(max int) { + t.maxElementSize = uint32(max) +} + var _ sort.Comparable = (*comparableElement)(nil) type comparableElement struct { @@ -210,5 +246,5 @@ func (s *sortableElements) iter(fn func(*streamv1.Element) (*comparableElement, return s.iter(fn) } s.cur = cur - return s.index < len(s.elements) + return s.index <= len(s.elements) } diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go index ce96cf39..049f5d29 100644 --- a/pkg/schema/metadata.go +++ b/pkg/schema/metadata.go @@ -253,7 +253,7 @@ func (sr *schemaRepo) createGroup(name string) (g *group) { if sr.resourceSupplier != nil { g = newGroup(sr.metadata, sr.l, sr.resourceSupplier) } else { - g = newPortableGroup(sr.metadata, sr.l) + g = newPortableGroup(sr.metadata, sr.l, sr.resourceSchemaSupplier) } sr.data[name] = g return @@ -354,13 +354,14 @@ func (sr *schemaRepo) Close() { var _ Group = (*group)(nil) type group struct { - resourceSupplier ResourceSupplier - metadata metadata.Repo - db atomic.Value - groupSchema atomic.Pointer[commonv1.Group] - l *logger.Logger - schemaMap map[string]*resourceSpec - mapMutex sync.RWMutex + resourceSupplier ResourceSupplier + resourceSchemaSupplier ResourceSchemaSupplier + metadata metadata.Repo + db atomic.Value + groupSchema atomic.Pointer[commonv1.Group] + l *logger.Logger + schemaMap map[string]*resourceSpec + mapMutex sync.RWMutex } func newGroup( @@ -369,11 +370,12 @@ func newGroup( resourceSupplier ResourceSupplier, ) *group { g := &group{ - groupSchema: atomic.Pointer[commonv1.Group]{}, - metadata: metadata, - l: l, - schemaMap: make(map[string]*resourceSpec), - resourceSupplier: resourceSupplier, + groupSchema: atomic.Pointer[commonv1.Group]{}, + metadata: metadata, + l: l, + schemaMap: make(map[string]*resourceSpec), + resourceSupplier: resourceSupplier, + resourceSchemaSupplier: resourceSupplier, } return g } @@ -381,12 +383,14 @@ func newGroup( func newPortableGroup( metadata metadata.Repo, l *logger.Logger, + resourceSchemaSupplier ResourceSchemaSupplier, ) *group { g := &group{ - groupSchema: atomic.Pointer[commonv1.Group]{}, - metadata: metadata, - l: l, - schemaMap: make(map[string]*resourceSpec), + groupSchema: atomic.Pointer[commonv1.Group]{}, + metadata: metadata, + l: l, + schemaMap: make(map[string]*resourceSpec), + resourceSchemaSupplier: resourceSchemaSupplier, } return g } @@ -454,13 +458,15 @@ func (g *group) storeResource(ctx context.Context, resourceSchema ResourceSchema if preResource != nil && preResource.isNewThan(resource) { return preResource, nil } + var dbSupplier tsdb.Supplier if !g.isPortable() { - sm, errTS := g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, resource) - if errTS != nil { - return nil, errTS - } - resource.delegated = sm + dbSupplier = g + } + sm, errTS := g.resourceSchemaSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, dbSupplier, resource) + if errTS != nil { + return nil, errTS } + resource.delegated = sm g.schemaMap[key] = resource if preResource != nil { _ = preResource.Close() @@ -501,7 +507,7 @@ func (g *group) close() (err error) { err = multierr.Append(err, s.Close()) } g.mapMutex.RUnlock() - if !g.isInit() { + if !g.isInit() || g.isPortable() { return nil } return multierr.Append(err, g.SupplyTSDB().Close()) diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go index f19a4230..65880102 100644 --- a/pkg/schema/schema.go +++ b/pkg/schema/schema.go @@ -75,12 +75,12 @@ type Resource interface { // ResourceSchemaSupplier allows get a ResourceSchema from the metadata. type ResourceSchemaSupplier interface { ResourceSchema(metadata *commonv1.Metadata) (ResourceSchema, error) + OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource) (io.Closer, error) } // ResourceSupplier allows open a resource and its embedded tsdb. type ResourceSupplier interface { ResourceSchemaSupplier - OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource) (io.Closer, error) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) } diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index f5af458e..b3159f90 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/cmdsetup" @@ -176,3 +177,54 @@ func CMD(flags ...string) func() { wg.Wait() } } + +// DataNode runs a data node. +func DataNode(etcdEndpoint string) func() { + path, deferFn, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ports, err := test.AllocateFreePorts(1) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + addr := fmt.Sprintf("%s:%d", host, ports[0]) + nodeHost := "127.0.0.1" + closeFn := CMD("data", + "--grpc-host="+host, + fmt.Sprintf("--grpc-port=%d", ports[0]), + "--stream-root-path="+path, + "--measure-root-path="+path, + "--etcd-endpoints", etcdEndpoint, + "--node-host-provider", "flag", + "--node-host", nodeHost) + gomega.Eventually( + helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpclib.WithTransportCredentials(insecure.NewCredentials())), + testflags.EventuallyTimeout).Should(gomega.Succeed()) + gomega.Eventually(func() (map[string]*databasev1.Node, error) { + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0])) + }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1)) + return func() { + closeFn() + deferFn() + } +} + +// LiaisonNode runs a liaison node. +func LiaisonNode(etcdEndpoint string) (string, func()) { + ports, err := test.AllocateFreePorts(2) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + addr := fmt.Sprintf("%s:%d", host, ports[0]) + httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) + nodeHost := "127.0.0.1" + closeFn := CMD("liaison", + "--grpc-host="+host, + fmt.Sprintf("--grpc-port=%d", ports[0]), + "--http-host="+host, + fmt.Sprintf("--http-port=%d", ports[1]), + "--http-grpc-addr="+addr, + "--etcd-endpoints", etcdEndpoint, + "--node-host-provider", "flag", + "--node-host", nodeHost) + gomega.Eventually(helpers.HTTPHealthCheck(httpAddr), testflags.EventuallyTimeout).Should(gomega.Succeed()) + gomega.Eventually(func() (map[string]*databasev1.Node, error) { + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0])) + }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1)) + return addr, closeFn +} diff --git a/test/integration/standalone/query/query_suite_test.go b/test/cases/init.go similarity index 56% copy from test/integration/standalone/query/query_suite_test.go copy to test/cases/init.go index 91b47189..72f66315 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/cases/init.go @@ -15,56 +15,26 @@ // specific language governing permissions and limitations // under the License. -package integration_query_test +// Package cases provides some tools to access test data. +package cases import ( - "testing" "time" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/onsi/gomega/gleak" + "github.com/onsi/gomega" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/apache/skywalking-banyandb/pkg/grpchelper" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/test/flags" - "github.com/apache/skywalking-banyandb/pkg/test/helpers" - "github.com/apache/skywalking-banyandb/pkg/test/setup" - "github.com/apache/skywalking-banyandb/pkg/timestamp" - casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data" - casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" - casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" - integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" ) -func TestIntegrationQuery(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Integration Query Suite", Label(integration_standalone.Labels...)) -} - -var ( - connection *grpc.ClientConn - now time.Time - deferFunc func() - goods []gleak.Goroutine -) - -var _ = SynchronizedBeforeSuite(func() []byte { - goods = gleak.Goroutines() - Expect(logger.Init(logger.Logging{ - Env: "dev", - Level: flags.LogLevel, - })).To(Succeed()) - var addr string - addr, _, deferFunc = setup.Standalone() +// Initialize test data. +func Initialize(addr string, now time.Time) { conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) - Expect(err).NotTo(HaveOccurred()) - ns := timestamp.NowMilli().UnixNano() - now = time.Unix(0, ns-ns%int64(time.Minute)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + defer conn.Close() interval := 500 * time.Millisecond // stream casesstreamdata.Write(conn, "data.json", now, interval) @@ -80,32 +50,4 @@ var _ = SynchronizedBeforeSuite(func() []byte { casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval) casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval) - Expect(conn.Close()).To(Succeed()) - return []byte(addr) -}, func(address []byte) { - var err error - connection, err = grpchelper.Conn(string(address), 10*time.Second, - grpc.WithTransportCredentials(insecure.NewCredentials())) - casesstream.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: now, - } - casesmeasure.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: now, - } - casestopn.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: now, - } - Expect(err).NotTo(HaveOccurred()) -}) - -var _ = SynchronizedAfterSuite(func() { - if connection != nil { - Expect(connection.Close()).To(Succeed()) - } -}, func() { - deferFunc() - Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) -}) +} diff --git a/test/cases/measure/data/input/order_tag_asc.yaml b/test/cases/measure/data/input/order_tag_asc.yaml index c5b1f1bd..a752c72b 100644 --- a/test/cases/measure/data/input/order_tag_asc.yaml +++ b/test/cases/measure/data/input/order_tag_asc.yaml @@ -22,8 +22,6 @@ tagProjection: tagFamilies: - name: "default" tags: ["id"] -fieldProjection: - names: ["total", "value"] orderBy: sort: "SORT_ASC" indexRuleName: "id" diff --git a/test/cases/measure/data/input/order_tag_desc.yaml b/test/cases/measure/data/input/order_tag_desc.yaml index 6031cdb3..c5007d75 100644 --- a/test/cases/measure/data/input/order_tag_desc.yaml +++ b/test/cases/measure/data/input/order_tag_desc.yaml @@ -22,8 +22,6 @@ tagProjection: tagFamilies: - name: "default" tags: ["id"] -fieldProjection: - names: ["total", "value"] orderBy: sort: "SORT_DESC" indexRuleName: "id" diff --git a/test/cases/measure/data/want/order_tag_asc.yaml b/test/cases/measure/data/want/order_tag_asc.yaml index b6a59101..123e2bda 100644 --- a/test/cases/measure/data/want/order_tag_asc.yaml +++ b/test/cases/measure/data/want/order_tag_asc.yaml @@ -16,105 +16,51 @@ # under the License. dataPoints: -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "1" - tagFamilies: +- tagFamilies: - name: default tags: - key: id value: str: value: svc1 - timestamp: "2023-06-26T12:41:00Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "2" - tagFamilies: + timestamp: "2023-09-19T08:35:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc1 - timestamp: "2023-06-26T12:42:00Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "3" - tagFamilies: + timestamp: "2023-09-19T08:37:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc1 - timestamp: "2023-06-26T12:43:00Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "5" - tagFamilies: + timestamp: "2023-09-19T08:36:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc2 - timestamp: "2023-06-26T12:44:00Z" -- fields: - - name: total - value: - int: - value: "50" - - name: value - value: - int: - value: "4" - tagFamilies: + timestamp: "2023-09-19T08:38:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc2 - timestamp: "2023-06-26T12:45:00Z" -- fields: - - name: total - value: - int: - value: "300" - - name: value - value: - int: - value: "6" - tagFamilies: + timestamp: "2023-09-19T08:39:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc3 - timestamp: "2023-06-26T12:46:00Z" + timestamp: "2023-09-19T08:40:00Z" diff --git a/test/cases/measure/data/want/order_tag_desc.yaml b/test/cases/measure/data/want/order_tag_desc.yaml index db711266..6e455083 100644 --- a/test/cases/measure/data/want/order_tag_desc.yaml +++ b/test/cases/measure/data/want/order_tag_desc.yaml @@ -16,105 +16,51 @@ # under the License. dataPoints: -- fields: - - name: total - value: - int: - value: "300" - - name: value - value: - int: - value: "6" - tagFamilies: +- tagFamilies: - name: default tags: - key: id value: str: value: svc3 - timestamp: "2023-06-26T12:47:00Z" -- fields: - - name: total - value: - int: - value: "50" - - name: value - value: - int: - value: "4" - tagFamilies: + timestamp: "2023-09-19T08:42:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc2 - timestamp: "2023-06-26T12:46:00Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "5" - tagFamilies: + timestamp: "2023-09-19T08:41:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc2 - timestamp: "2023-06-26T12:45:00Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "3" - tagFamilies: + timestamp: "2023-09-19T08:40:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc1 - timestamp: "2023-06-26T12:44:00Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "2" - tagFamilies: + timestamp: "2023-09-19T08:38:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc1 - timestamp: "2023-06-26T12:43:00Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "1" - tagFamilies: + timestamp: "2023-09-19T08:39:00Z" +- tagFamilies: - name: default tags: - key: id value: str: value: svc1 - timestamp: "2023-06-26T12:42:00Z" + timestamp: "2023-09-19T08:37:00Z" diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/distributed/query/query_suite_test.go similarity index 55% copy from test/integration/standalone/query/query_suite_test.go copy to test/integration/distributed/query/query_suite_test.go index 91b47189..0c78a14f 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/distributed/query/query_suite_test.go @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. -package integration_query_test +// Package integration_setup_test is a integration test suite. +package integration_setup_test import ( + "context" + "fmt" "testing" "time" @@ -27,61 +30,82 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" + test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" "github.com/apache/skywalking-banyandb/pkg/test/setup" + test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" - casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" - casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" - integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" ) -func TestIntegrationQuery(t *testing.T) { +func TestQuery(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Integration Query Suite", Label(integration_standalone.Labels...)) + RunSpecs(t, "Distributed Query Suite") } var ( - connection *grpc.ClientConn - now time.Time deferFunc func() goods []gleak.Goroutine + now time.Time + connection *grpc.ClientConn ) var _ = SynchronizedBeforeSuite(func() []byte { - goods = gleak.Goroutines() Expect(logger.Init(logger.Logging{ Env: "dev", Level: flags.LogLevel, })).To(Succeed()) - var addr string - addr, _, deferFunc = setup.Standalone() - conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) + goods = gleak.Goroutines() + By("Starting etcd server") + ports, err := test.AllocateFreePorts(2) + Expect(err).NotTo(HaveOccurred()) + dir, spaceDef, err := test.NewSpace() Expect(err).NotTo(HaveOccurred()) + ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0]) + server, err := embeddedetcd.NewServer( + embeddedetcd.ConfigureListener([]string{ep}, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(dir)) + Expect(err).ShouldNot(HaveOccurred()) + <-server.ReadyNotify() + By("Loading schema") + schemaRegistry, err := schema.NewEtcdSchemaRegistry( + schema.Namespace(metadata.DefaultNamespace), + schema.ConfigureServerEndpoints([]string{ep}), + ) + Expect(err).NotTo(HaveOccurred()) + defer schemaRegistry.Close() + ctx := context.Background() + test_stream.PreloadSchema(ctx, schemaRegistry) + test_measure.PreloadSchema(ctx, schemaRegistry) + By("Starting data node 0") + closeDataNode0 := setup.DataNode(ep) + By("Starting data node 1") + closeDataNode1 := setup.DataNode(ep) + By("Starting liaison node") + liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep) + By("Initializing test cases") ns := timestamp.NowMilli().UnixNano() now = time.Unix(0, ns-ns%int64(time.Minute)) - interval := 500 * time.Millisecond - // stream - casesstreamdata.Write(conn, "data.json", now, interval) - // measure - interval = time.Minute - casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) - casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), interval) - casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval) - Expect(conn.Close()).To(Succeed()) - return []byte(addr) + test_cases.Initialize(liaisonAddr, now) + deferFunc = func() { + closerLiaisonNode() + closeDataNode0() + closeDataNode1() + _ = server.Close() + <-server.StopNotify() + spaceDef() + } + return []byte(liaisonAddr) }, func(address []byte) { var err error connection, err = grpchelper.Conn(string(address), 10*time.Second, diff --git a/test/integration/standalone/cold_query/query_suite_test.go b/test/integration/standalone/cold_query/query_suite_test.go index 7c86e67e..18e832ad 100644 --- a/test/integration/standalone/cold_query/query_suite_test.go +++ b/test/integration/standalone/cold_query/query_suite_test.go @@ -33,10 +33,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" - casesmeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" - casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" ) @@ -61,24 +60,9 @@ var _ = SynchronizedBeforeSuite(func() []byte { })).To(Succeed()) var addr string addr, _, deferFunc = setup.Standalone() - conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) - Expect(err).NotTo(HaveOccurred()) ns := timestamp.NowMilli().UnixNano() now = time.Unix(0, ns-ns%int64(time.Minute)).Add(-time.Hour * 24) - interval := 500 * time.Millisecond - casesstreamdata.Write(conn, "data.json", now, interval) - interval = time.Minute - casesmeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) - casesmeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) - casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) - casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval) - casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval) - casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), interval) - casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval) - casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval) - Expect(conn.Close()).To(Succeed()) + test_cases.Initialize(addr, now) return []byte(addr) }, func(address []byte) { var err error diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index 91b47189..a1e684b2 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -33,10 +33,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" - casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" - casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" ) @@ -61,26 +60,9 @@ var _ = SynchronizedBeforeSuite(func() []byte { })).To(Succeed()) var addr string addr, _, deferFunc = setup.Standalone() - conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) - Expect(err).NotTo(HaveOccurred()) ns := timestamp.NowMilli().UnixNano() now = time.Unix(0, ns-ns%int64(time.Minute)) - interval := 500 * time.Millisecond - // stream - casesstreamdata.Write(conn, "data.json", now, interval) - // measure - interval = time.Minute - casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) - casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), interval) - casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval) - Expect(conn.Close()).To(Succeed()) + test_cases.Initialize(addr, now) return []byte(addr) }, func(address []byte) { var err error
