This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch test
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 0fef03b0c924347c52caef7d4459f3014219bdd0
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Sep 19 11:35:03 2023 +0000

    Apply query test cases to a cluster
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/data/data.go                                   | 68 ++++++++++++++++++
 api/data/doc.go                                    | 19 -----
 banyand/dquery/dquery.go                           | 14 +++-
 banyand/dquery/measure.go                          |  2 +-
 banyand/dquery/stream.go                           |  2 +-
 banyand/dquery/topn.go                             | 53 ++++++++------
 banyand/liaison/grpc/discovery.go                  |  5 +-
 banyand/liaison/grpc/measure.go                    | 15 ++--
 banyand/liaison/grpc/registry_test.go              |  2 +-
 banyand/liaison/grpc/server.go                     | 14 ++--
 banyand/liaison/grpc/stream.go                     |  9 ++-
 banyand/measure/measure.go                         |  5 +-
 banyand/measure/metadata.go                        | 13 ++++
 banyand/query/processor.go                         |  4 +-
 banyand/query/processor_topn.go                    | 25 +++----
 banyand/queue/pub/pub.go                           | 37 +++++++---
 banyand/queue/sub/sub.go                           | 39 ++++++++--
 banyand/stream/metadata.go                         | 12 ++++
 banyand/stream/stream.go                           |  3 +
 banyand/tsdb/buffer.go                             |  2 +-
 pkg/bus/bus.go                                     |  5 --
 pkg/cmdsetup/data.go                               |  8 ---
 pkg/cmdsetup/liaison.go                            |  7 +-
 pkg/cmdsetup/standalone.go                         |  2 +-
 .../logical/measure/measure_plan_distributed.go    | 84 +++++++++++++++-------
 .../logical/stream/stream_plan_distributed.go      | 54 +++++++++++---
 pkg/schema/metadata.go                             | 52 ++++++++------
 pkg/schema/schema.go                               |  2 +-
 pkg/test/setup/setup.go                            | 52 ++++++++++++++
 .../query/query_suite_test.go => cases/init.go}    | 74 +++----------------
 test/cases/measure/data/input/order_tag_asc.yaml   |  2 -
 test/cases/measure/data/input/order_tag_desc.yaml  |  2 -
 test/cases/measure/data/want/order_tag_asc.yaml    | 78 ++++----------------
 test/cases/measure/data/want/order_tag_desc.yaml   | 78 ++++----------------
 .../query/query_suite_test.go                      | 82 +++++++++++++--------
 .../standalone/cold_query/query_suite_test.go      | 20 +-----
 .../standalone/query/query_suite_test.go           | 22 +-----
 37 files changed, 524 insertions(+), 443 deletions(-)

diff --git a/api/data/data.go b/api/data/data.go
new file mode 100644
index 00000000..5da103ef
--- /dev/null
+++ b/api/data/data.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package data contains data transmission topics.
+package data
+
+import (
+       "google.golang.org/protobuf/proto"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+// TopicMap is the map of topic name to topic.
+var TopicMap = map[string]bus.Topic{
+       TopicStreamWrite.String():  TopicStreamWrite,
+       TopicStreamQuery.String():  TopicStreamQuery,
+       TopicMeasureWrite.String(): TopicMeasureWrite,
+       TopicMeasureQuery.String(): TopicMeasureQuery,
+       TopicTopNQuery.String():    TopicTopNQuery,
+}
+
+// TopicRequestMap is the map of topic name to request message.
+var TopicRequestMap = map[bus.Topic]func() proto.Message{
+       TopicStreamWrite: func() proto.Message {
+               return &streamv1.InternalWriteRequest{}
+       },
+       TopicStreamQuery: func() proto.Message {
+               return &streamv1.QueryRequest{}
+       },
+       TopicMeasureWrite: func() proto.Message {
+               return &measurev1.InternalWriteRequest{}
+       },
+       TopicMeasureQuery: func() proto.Message {
+               return &measurev1.QueryRequest{}
+       },
+       TopicTopNQuery: func() proto.Message {
+               return &measurev1.TopNRequest{}
+       },
+}
+
+// TopicResponseMap is the map of topic name to response message.
+var TopicResponseMap = map[bus.Topic]func() proto.Message{
+       TopicStreamQuery: func() proto.Message {
+               return &streamv1.QueryResponse{}
+       },
+       TopicMeasureQuery: func() proto.Message {
+               return &measurev1.QueryResponse{}
+       },
+       TopicTopNQuery: func() proto.Message {
+               return &measurev1.TopNResponse{}
+       },
+}
diff --git a/api/data/doc.go b/api/data/doc.go
deleted file mode 100644
index 25918677..00000000
--- a/api/data/doc.go
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package data contains data transmission topics.
-package data
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index 4d77b6fb..c7e1454d 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -21,9 +21,13 @@ package dquery
 import (
        "context"
 
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/data"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -44,14 +48,16 @@ type queryService struct {
        mqp         *measureQueryProcessor
        tqp         *topNQueryProcessor
        closer      *run.Closer
+       pipeline    queue.Server
 }
 
 // NewService return a new query service.
-func NewService(metaService metadata.Repo, broadcaster bus.Broadcaster,
+func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster 
bus.Broadcaster,
 ) (run.Unit, error) {
        svc := &queryService{
                metaService: metaService,
                closer:      run.NewCloser(1),
+               pipeline:    pipeline,
        }
        svc.sqp = &streamQueryProcessor{
                queryService: svc,
@@ -76,7 +82,11 @@ func (q *queryService) PreRun(_ context.Context) error {
        q.log = logger.GetLogger(moduleName)
        q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
        q.mqp.measureService = measure.NewPortableRepository(q.metaService, 
q.log)
-       return nil
+       return multierr.Combine(
+               q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
+               q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
+               q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp),
+       )
 }
 
 func (q *queryService) GracefulStop() {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index b145de94..9dc95d82 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -95,6 +95,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        if e := ml.Debug(); e.Enabled() {
                e.RawJSON("ret", 
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
        }
-       resp = bus.NewMessage(bus.MessageID(now), result)
+       resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{DataPoints: result})
        return
 }
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 62804d00..6fd129d0 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -78,7 +78,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                return
        }
 
-       resp = bus.NewMessage(bus.MessageID(now), entities)
+       resp = bus.NewMessage(bus.MessageID(now), 
&streamv1.QueryResponse{Elements: entities})
 
        return
 }
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index df34d337..a885f829 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -19,12 +19,13 @@ package dquery
 
 import (
        "go.uber.org/multierr"
-       "google.golang.org/protobuf/types/known/timestamppb"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/query"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/iter/sort"
@@ -48,6 +49,8 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
        if e := t.log.Debug(); e.Enabled() {
                e.Stringer("req", request).Msg("received a topN query event")
        }
+       agg := request.Agg
+       request.Agg = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
        now := bus.MessageID(request.TimeRange.Begin.Nanos)
        ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, 
bus.NewMessage(now, request))
        if err != nil {
@@ -55,35 +58,41 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                return
        }
        var allErr error
-       var sii []sort.Iterator[*comparableTopNItem]
-       var latestTimestamp *timestamppb.Timestamp
+       aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
+               agg, request.GetFieldValueSort())
+       var tags []string
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
                        allErr = multierr.Append(allErr, getErr)
                } else {
-                       tl := m.Data().(*measurev1.TopNList)
-                       un := tl.Timestamp.AsTime().UnixNano()
-                       if un > latestTimestamp.AsTime().UnixNano() {
-                               latestTimestamp = tl.Timestamp
+                       d := m.Data()
+                       if d == nil {
+                               continue
+                       }
+                       topNResp := d.(*measurev1.TopNResponse)
+                       for _, l := range topNResp.Lists {
+                               for _, tn := range l.Items {
+                                       if tags == nil {
+                                               tags = make([]string, 0, 
len(tn.Entity))
+                                               for _, e := range tn.Entity {
+                                                       tags = append(tags, 
e.Key)
+                                               }
+                                       }
+                                       entityValues := make(tsdb.EntityValues, 
0, len(tn.Entity))
+                                       for _, e := range tn.Entity {
+                                               entityValues = 
append(entityValues, e.Value)
+                                       }
+                                       _ = aggregator.Put(entityValues, 
tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli()))
+                               }
                        }
-                       sii = append(sii, &sortedTopNList{TopNList: tl})
                }
        }
-       var desc bool
-       if request.GetFieldValueSort() == modelv1.Sort_SORT_DESC {
-               desc = true
-       }
-       iter := sort.NewItemIter[*comparableTopNItem](sii, desc)
-       defer func() {
-               _ = iter.Close()
-       }()
-       var items []*measurev1.TopNList_Item
-       for iter.Next() && len(items) < int(request.TopN) {
-               items = append(items, iter.Val().TopNList_Item)
+       if tags == nil {
+               resp = bus.NewMessage(now, &measurev1.TopNResponse{})
+               return
        }
-       resp = bus.NewMessage(now, &measurev1.TopNList{
-               Items:     items,
-               Timestamp: latestTimestamp,
+       resp = bus.NewMessage(now, &measurev1.TopNResponse{
+               Lists: aggregator.Val(tags),
        })
        return
 }
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index b133d21d..57914fec 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -29,7 +29,6 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
-       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
@@ -38,7 +37,6 @@ import (
 var errNotExist = errors.New("the object doesn't exist")
 
 type discoveryService struct {
-       pipeline     queue.Client
        metadataRepo metadata.Repo
        nodeRegistry NodeRegistry
        shardRepo    *shardRepo
@@ -47,13 +45,12 @@ type discoveryService struct {
        kind         schema.Kind
 }
 
-func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo 
metadata.Repo, nodeRegistry NodeRegistry) *discoveryService {
+func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, 
nodeRegistry NodeRegistry) *discoveryService {
        sr := &shardRepo{shardEventsMap: make(map[identity]uint32)}
        er := &entityRepo{entitiesMap: 
make(map[identity]partition.EntityLocator)}
        return &discoveryService{
                shardRepo:    sr,
                entityRepo:   er,
-               pipeline:     pipeline,
                kind:         kind,
                metadataRepo: metadataRepo,
                nodeRegistry: nodeRegistry,
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 4b3f7da9..2a32d29d 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -31,6 +31,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/accesslog"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -43,6 +44,8 @@ type measureService struct {
        *discoveryService
        sampled            *logger.Logger
        ingestionAccessLog accesslog.Log
+       pipeline           queue.Client
+       broadcaster        queue.Client
 }
 
 func (ms *measureService) setLogger(log *logger.Logger) {
@@ -137,7 +140,7 @@ func (ms *measureService) Query(_ context.Context, req 
*measurev1.QueryRequest)
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
        message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
-       feat, errQuery := ms.pipeline.Publish(data.TopicMeasureQuery, message)
+       feat, errQuery := ms.broadcaster.Publish(data.TopicMeasureQuery, 
message)
        if errQuery != nil {
                return nil, errQuery
        }
@@ -150,8 +153,8 @@ func (ms *measureService) Query(_ context.Context, req 
*measurev1.QueryRequest)
        }
        data := msg.Data()
        switch d := data.(type) {
-       case []*measurev1.DataPoint:
-               return &measurev1.QueryResponse{DataPoints: d}, nil
+       case *measurev1.QueryResponse:
+               return d, nil
        case common.Error:
                return nil, errors.WithMessage(errQueryMsg, d.Msg())
        }
@@ -164,7 +167,7 @@ func (ms *measureService) TopN(_ context.Context, 
topNRequest *measurev1.TopNReq
        }
 
        message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
topNRequest)
-       feat, errQuery := ms.pipeline.Publish(data.TopicTopNQuery, message)
+       feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message)
        if errQuery != nil {
                return nil, errQuery
        }
@@ -174,8 +177,8 @@ func (ms *measureService) TopN(_ context.Context, 
topNRequest *measurev1.TopNReq
        }
        data := msg.Data()
        switch d := data.(type) {
-       case []*measurev1.TopNList:
-               return &measurev1.TopNResponse{Lists: d}, nil
+       case *measurev1.TopNResponse:
+               return d, nil
        case common.Error:
                return nil, errors.WithMessage(errQueryMsg, d.Msg())
        }
diff --git a/banyand/liaison/grpc/registry_test.go 
b/banyand/liaison/grpc/registry_test.go
index 6dff21fa..d7118b40 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -178,7 +178,7 @@ func setupForRegistry() func() {
        metaSvc, err := metadata.NewService(context.TODO())
        Expect(err).NotTo(HaveOccurred())
 
-       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
+       tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index a89b7004..102bd170 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -64,8 +64,7 @@ type Server interface {
 }
 
 type server struct {
-       pipeline queue.Client
-       creds    credentials.TransportCredentials
+       creds credentials.TransportCredentials
        *streamRegistryServer
        log *logger.Logger
        *indexRuleBindingRegistryServer
@@ -91,15 +90,18 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry 
metadata.Repo, nodeRegistry NodeRegistry) Server {
+func NewServer(_ context.Context, pipeline, broadcaster queue.Client, 
schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server {
        streamSVC := &streamService{
-               discoveryService: newDiscoveryService(pipeline, 
schema.KindStream, schemaRegistry, nodeRegistry),
+               discoveryService: newDiscoveryService(schema.KindStream, 
schemaRegistry, nodeRegistry),
+               pipeline:         pipeline,
+               broadcaster:      broadcaster,
        }
        measureSVC := &measureService{
-               discoveryService: newDiscoveryService(pipeline, 
schema.KindMeasure, schemaRegistry, nodeRegistry),
+               discoveryService: newDiscoveryService(schema.KindMeasure, 
schemaRegistry, nodeRegistry),
+               pipeline:         pipeline,
+               broadcaster:      broadcaster,
        }
        s := &server{
-               pipeline:   pipeline,
                streamSVC:  streamSVC,
                measureSVC: measureSVC,
                streamRegistryServer: &streamRegistryServer{
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 0ae2b30d..aa3d49a4 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -31,6 +31,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/accesslog"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -43,6 +44,8 @@ type streamService struct {
        *discoveryService
        sampled            *logger.Logger
        ingestionAccessLog accesslog.Log
+       pipeline           queue.Client
+       broadcaster        queue.Client
 }
 
 func (s *streamService) setLogger(log *logger.Logger) {
@@ -142,7 +145,7 @@ func (s *streamService) Query(_ context.Context, req 
*streamv1.QueryRequest) (*s
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
        message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
-       feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
+       feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message)
        if errQuery != nil {
                if errors.Is(errQuery, io.EOF) {
                        return emptyStreamQueryResponse, nil
@@ -155,8 +158,8 @@ func (s *streamService) Query(_ context.Context, req 
*streamv1.QueryRequest) (*s
        }
        data := msg.Data()
        switch d := data.(type) {
-       case []*streamv1.Element:
-               return &streamv1.QueryResponse{Elements: d}, nil
+       case *streamv1.QueryResponse:
+               return d, nil
        case common.Error:
                return nil, errors.WithMessage(errQueryMsg, d.Msg())
        }
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index b8a55140..e5f28956 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -114,9 +114,12 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec 
measureSpec, l *logger.
        if err := m.parseSpec(); err != nil {
                return nil, err
        }
-       ctx := context.WithValue(context.Background(), logger.ContextKey, l)
+       if db == nil {
+               return m, nil
+       }
 
        m.databaseSupplier = db
+       ctx := context.WithValue(context.Background(), logger.ContextKey, l)
        m.indexWriter = index.NewWriter(ctx, index.WriterOptions{
                DB:         db,
                ShardNum:   shardNum,
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 053abee3..27dfa090 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -405,3 +405,16 @@ func (s *portableSupplier) ResourceSchema(md 
*commonv1.Metadata) (resourceSchema
        defer cancel()
        return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
 }
+
+func (*portableSupplier) OpenDB(_ *commonv1.Group) (tsdb.Database, error) {
+       panic("do not support open db")
+}
+
+func (s *portableSupplier) OpenResource(shardNum uint32, _ tsdb.Supplier, spec 
resourceSchema.Resource) (io.Closer, error) {
+       measureSchema := spec.Schema().(*databasev1.Measure)
+       return openMeasure(shardNum, nil, measureSpec{
+               schema:           measureSchema,
+               indexRules:       spec.IndexRules(),
+               topNAggregations: spec.TopN(),
+       }, s.l, nil)
+}
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index a47e6df8..70a89d8e 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -104,7 +104,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                return
        }
 
-       resp = bus.NewMessage(bus.MessageID(now), entities)
+       resp = bus.NewMessage(bus.MessageID(now), 
&streamv1.QueryResponse{Elements: entities})
 
        return
 }
@@ -170,7 +170,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        if e := ml.Debug(); e.Enabled() {
                e.RawJSON("ret", 
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
        }
-       resp = bus.NewMessage(bus.MessageID(now), result)
+       resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{DataPoints: result})
        return
 }
 
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 55939991..1f266c7e 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -88,7 +88,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                        Msg("fail to list shards")
                return
        }
-       aggregator := createTopNPostAggregator(request.GetTopN(),
+       aggregator := CreateTopNPostAggregator(request.GetTopN(),
                request.GetAgg(), request.GetFieldValueSort())
        entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), 
request.GetConditions())
        if err != nil {
@@ -130,7 +130,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                                                        Msg("fail to parse topN 
family")
                                                return
                                        }
-                                       _ = 
aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), 
iter.Val().Time())
+                                       _ = 
aggregator.Put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), 
iter.Val().Time())
                                }
                                _ = iter.Close()
                        }
@@ -138,7 +138,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
        }
 
        now := time.Now().UnixNano()
-       resp = bus.NewMessage(bus.MessageID(now), 
aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames()))
+       resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.TopNResponse{Lists: 
aggregator.Val(sourceMeasure.GetSchema().GetEntity().GetTagNames())})
 
        return
 }
@@ -254,13 +254,14 @@ func (n *aggregatorItem) GetTags(tagNames []string) 
[]*modelv1.Tag {
        return tags
 }
 
-// postProcessor defines necessary methods for Top-N post processor with or 
without aggregation.
-type postProcessor interface {
-       put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) 
error
-       val([]string) []*measurev1.TopNList
+// PostProcessor defines necessary methods for Top-N post processor with or 
without aggregation.
+type PostProcessor interface {
+       Put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) 
error
+       Val([]string) []*measurev1.TopNList
 }
 
-func createTopNPostAggregator(topN int32, aggrFunc 
modelv1.AggregationFunction, sort modelv1.Sort) postProcessor {
+// CreateTopNPostAggregator creates a Top-N post processor with or without 
aggregation.
+func CreateTopNPostAggregator(topN int32, aggrFunc 
modelv1.AggregationFunction, sort modelv1.Sort) PostProcessor {
        if aggrFunc == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
                // if aggregation is not specified, we have to keep all 
timelines
                return &postNonAggregationProcessor{
@@ -327,7 +328,7 @@ func (aggr *postAggregationProcessor) Pop() any {
        return item
 }
 
-func (aggr *postAggregationProcessor) put(entityValues tsdb.EntityValues, val 
int64, timestampMillis uint64) error {
+func (aggr *postAggregationProcessor) Put(entityValues tsdb.EntityValues, val 
int64, timestampMillis uint64) error {
        // update latest ts
        if aggr.latestTimestamp < timestampMillis {
                aggr.latestTimestamp = timestampMillis
@@ -374,7 +375,7 @@ func (aggr *postAggregationProcessor) tryEnqueue(key 
string, item *aggregatorIte
        }
 }
 
-func (aggr *postAggregationProcessor) val(tagNames []string) 
[]*measurev1.TopNList {
+func (aggr *postAggregationProcessor) Val(tagNames []string) 
[]*measurev1.TopNList {
        topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
 
        for aggr.Len() > 0 {
@@ -430,7 +431,7 @@ type postNonAggregationProcessor struct {
        sort      modelv1.Sort
 }
 
-func (naggr *postNonAggregationProcessor) val(tagNames []string) 
[]*measurev1.TopNList {
+func (naggr *postNonAggregationProcessor) Val(tagNames []string) 
[]*measurev1.TopNList {
        topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
        for ts, timeline := range naggr.timelines {
                items := make([]*measurev1.TopNList_Item, timeline.Len())
@@ -462,7 +463,7 @@ func (naggr *postNonAggregationProcessor) val(tagNames 
[]string) []*measurev1.To
        return topNLists
 }
 
-func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues, 
val int64, timestampMillis uint64) error {
+func (naggr *postNonAggregationProcessor) Put(entityValues tsdb.EntityValues, 
val int64, timestampMillis uint64) error {
        key := entityValues.String()
        if timeline, ok := naggr.timelines[timestampMillis]; ok {
                if timeline.Len() < int(naggr.topN) {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 9e00b13a..51d56967 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -30,6 +30,7 @@ import (
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/anypb"
 
+       "github.com/apache/skywalking-banyandb/api/data"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -105,9 +106,7 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
                if !ok {
                        return multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
                }
-               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
-               defer cancel()
-               stream, errCreateStream := client.client.Send(ctx)
+               stream, errCreateStream := 
client.client.Send(context.Background())
                if err != nil {
                        return multierr.Append(err, fmt.Errorf("failed to get 
stream for node %s: %w", node, errCreateStream))
                }
@@ -116,6 +115,7 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
                        return multierr.Append(err, fmt.Errorf("failed to send 
message to node %s: %w", node, errSend))
                }
                f.clients = append(f.clients, stream)
+               f.topics = append(f.topics, topic)
                return err
        }
        for _, m := range messages {
@@ -133,6 +133,7 @@ func (p *pub) NewBatchPublisher() queue.BatchPublisher {
 func New(metadata metadata.Repo) queue.Client {
        return &pub{
                metadata: metadata,
+               clients:  make(map[string]*client),
                closer:   run.NewCloser(1),
        }
 }
@@ -174,7 +175,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages 
...bus.Message) (bus
                }
                node := m.Node()
                sendData := func() (success bool) {
-                       if stream, ok := bp.streams[node]; !ok {
+                       if stream, ok := bp.streams[node]; ok {
                                defer func() {
                                        if !success {
                                                delete(bp.streams, node)
@@ -225,9 +226,6 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages 
...bus.Message) (bus
 }
 
 func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, 
error) {
-       if !m.IsRemote() {
-               return nil, fmt.Errorf("message %d is not remote", m.ID())
-       }
        r := &clusterv1.SendRequest{
                Topic:     topic.String(),
                MessageId: uint64(m.ID()),
@@ -246,6 +244,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
 
 type future struct {
        clients []clusterv1.Service_SendClient
+       topics  []bus.Topic
 }
 
 func (l *future) Get() (bus.Message, error) {
@@ -253,17 +252,33 @@ func (l *future) Get() (bus.Message, error) {
                return bus.Message{}, io.EOF
        }
        c := l.clients[0]
+       t := l.topics[0]
        defer func() {
                l.clients = l.clients[1:]
+               l.topics = l.topics[1:]
        }()
        resp, err := c.Recv()
        if err != nil {
                return bus.Message{}, err
        }
-       return bus.NewMessage(
-               bus.MessageID(resp.MessageId),
-               resp.Body,
-       ), nil
+       if resp.Error != "" {
+               return bus.Message{}, errors.New(resp.Error)
+       }
+       if resp.Body == nil {
+               return bus.NewMessage(bus.MessageID(resp.MessageId), nil), nil
+       }
+       if messageSupplier, ok := data.TopicResponseMap[t]; ok {
+               m := messageSupplier()
+               err = resp.Body.UnmarshalTo(m)
+               if err != nil {
+                       return bus.Message{}, err
+               }
+               return bus.NewMessage(
+                       bus.MessageID(resp.MessageId),
+                       m,
+               ), nil
+       }
+       return bus.Message{}, fmt.Errorf("invalid topic %s", t)
 }
 
 func (l *future) GetAll() ([]bus.Message, error) {
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index c689eef0..021ec877 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -22,9 +22,12 @@ import (
        "io"
 
        "github.com/pkg/errors"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/anypb"
 
+       "github.com/apache/skywalking-banyandb/api/data"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
 )
@@ -52,11 +55,18 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        return nil
                }
                if err != nil {
-                       reply(writeEntity, err, "failed to receive message")
-                       continue
+                       if status.Code(err) == codes.Canceled {
+                               return nil
+                       }
+                       s.log.Error().Err(err).Msg("failed to receive message")
+                       return err
                }
                if writeEntity.Topic != "" && topic == nil {
-                       t := bus.UniTopic(writeEntity.Topic)
+                       t, ok := data.TopicMap[writeEntity.Topic]
+                       if !ok {
+                               reply(writeEntity, err, "invalid topic")
+                               continue
+                       }
                        topic = &t
                }
                if topic == nil {
@@ -68,9 +78,25 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        reply(writeEntity, err, "no listener found")
                        continue
                }
-               m := 
listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), 
writeEntity.Body))
+               var m bus.Message
+               if reqSupplier, ok := data.TopicRequestMap[*topic]; ok {
+                       req := reqSupplier()
+                       if errUnmarshal := writeEntity.Body.UnmarshalTo(req); 
errUnmarshal != nil {
+                               reply(writeEntity, errUnmarshal, "failed to 
unmarshal message")
+                               continue
+                       }
+                       m = 
listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), req))
+               } else {
+                       reply(writeEntity, err, "unknown topic")
+                       continue
+               }
+
                if m.Data() == nil {
-                       reply(writeEntity, err, "no response")
+                       if errSend := stream.Send(&clusterv1.SendResponse{
+                               MessageId: writeEntity.MessageId,
+                       }); errSend != nil {
+                               s.log.Error().Stringer("written", 
writeEntity).Err(errSend).Msg("failed to send response")
+                       }
                        continue
                }
                message, ok := m.Data().(proto.Message)
@@ -87,8 +113,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        MessageId: writeEntity.MessageId,
                        Body:      anyMessage,
                }); err != nil {
-                       reply(writeEntity, err, "failed to send response")
-                       continue
+                       s.log.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to send response")
                }
        }
 }
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 0ce3e6ce..18d5c3a1 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -271,6 +271,18 @@ type portableSupplier struct {
        l        *logger.Logger
 }
 
+func (*portableSupplier) OpenDB(_ *commonv1.Group) (tsdb.Database, error) {
+       panic("do not support open db")
+}
+
+func (s *portableSupplier) OpenResource(shardNum uint32, _ tsdb.Supplier, 
resource resourceSchema.Resource) (io.Closer, error) {
+       streamSchema := resource.Schema().(*databasev1.Stream)
+       return openStream(shardNum, nil, streamSpec{
+               schema:     streamSchema,
+               indexRules: resource.IndexRules(),
+       }, s.l), nil
+}
+
 func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) 
*portableSupplier {
        return &portableSupplier{
                metadata: metadata,
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index a989ebb2..4d5c8f8a 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -73,6 +73,9 @@ func openStream(shardNum uint32, db tsdb.Supplier, spec 
streamSpec, l *logger.Lo
        sm.parseSpec()
        ctx := context.WithValue(context.Background(), logger.ContextKey, l)
 
+       if db == nil {
+               return sm
+       }
        sm.db = db
        sm.indexWriter = index.NewWriter(ctx, index.WriterOptions{
                DB:                db,
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 1fc94283..2d8764fd 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -385,7 +385,6 @@ func (bsb *bufferShardBucket) recoveryWal() error {
 
 func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) {
        var wg sync.WaitGroup
-       wg.Add(len(segment.GetLogEntries()))
        for _, logEntry := range segment.GetLogEntries() {
                timestamps := logEntry.GetTimestamps()
                values := logEntry.GetValues()
@@ -404,6 +403,7 @@ func (bsb *bufferShardBucket) recoveryWorkSegment(segment 
wal.Segment) {
                                        }
                                },
                        }
+                       wg.Add(1)
                        elementIndex++
                }
        }
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 6ad65ada..5df38130 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -64,11 +64,6 @@ func (m Message) Node() string {
        return m.node
 }
 
-// IsRemote returns true if the Message is sent from a remote node.
-func (m Message) IsRemote() bool {
-       return m.node != "local"
-}
-
 // NewMessage returns a new Message with a MessageID and embed data.
 func NewMessage(id MessageID, data interface{}) Message {
        return Message{id: id, node: "local", payload: data}
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 7a421b12..a6ec86a6 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -30,7 +30,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/query"
        "github.com/apache/skywalking-banyandb/banyand/queue/sub"
        "github.com/apache/skywalking-banyandb/banyand/stream"
-       "github.com/apache/skywalking-banyandb/pkg/config"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/version"
@@ -75,17 +74,10 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        }
        dataGroup := run.NewGroup("data")
        dataGroup.Register(units...)
-       logging := logger.Logging{}
        dataCmd := &cobra.Command{
                Use:     "data",
                Version: version.Build(),
                Short:   "Run as the data server",
-               PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
-                       if err = config.Load("logging", cmd.Flags()); err != 
nil {
-                               return err
-                       }
-                       return logger.Init(logging)
-               },
                RunE: func(cmd *cobra.Command, args []string) (err error) {
                        node, err := common.GenerateNode(pipeline.GetPort(), 
nil)
                        if err != nil {
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index e1178ef3..126f4b89 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/queue/pub"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -43,11 +44,12 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
        pipeline := pub.New(metaSvc)
-       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, 
grpc.NewClusterNodeRegistry(pipeline))
+       localPipeline := queue.Local()
+       grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
grpc.NewClusterNodeRegistry(pipeline))
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()
-       dQuery, err := dquery.NewService(metaSvc, pipeline)
+       dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate distributed query 
service")
        }
@@ -55,6 +57,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        units = append(units, runners...)
        units = append(units,
                metaSvc,
+               localPipeline,
                pipeline,
                dQuery,
                grpcServer,
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 3c659b09..c2638b8e 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -57,7 +57,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
-       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
+       grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 45ee4c66..bbe4d0d4 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -38,9 +38,7 @@ import (
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 
 type unresolvedDistributed struct {
-       originalQuery     *measurev1.QueryRequest
-       order             *logical.OrderBy
-       maxDataPointsSize int
+       originalQuery *measurev1.QueryRequest
 }
 
 func newUnresolvedDistributed(query *measurev1.QueryRequest) 
logical.UnresolvedPlan {
@@ -49,39 +47,61 @@ func newUnresolvedDistributed(query 
*measurev1.QueryRequest) logical.UnresolvedP
        }
 }
 
-func (ud *unresolvedDistributed) Limit(max int) {
-       ud.maxDataPointsSize = max
-}
-
-func (ud *unresolvedDistributed) Sort(order *logical.OrderBy) {
-       ud.order = order
-}
-
 func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, 
error) {
        if ud.originalQuery.TagProjection == nil {
                return nil, fmt.Errorf("tag projection is required")
        }
-       if ud.originalQuery.FieldProjection == nil {
-               return nil, fmt.Errorf("filed projection is required")
+       projectionTags := logical.ToTags(ud.originalQuery.GetTagProjection())
+       if len(projectionTags) > 0 {
+               var err error
+               projTagsRefs, err := s.CreateTagRef(projectionTags...)
+               if err != nil {
+                       return nil, err
+               }
+               s = s.ProjTags(projTagsRefs...)
+       }
+       projectionFields := make([]*logical.Field, 
len(ud.originalQuery.GetFieldProjection().GetNames()))
+       for i, fieldNameProj := range 
ud.originalQuery.GetFieldProjection().GetNames() {
+               projectionFields[i] = logical.NewField(fieldNameProj)
+       }
+       if len(projectionFields) > 0 {
+               var err error
+               projFieldRefs, err := s.CreateFieldRef(projectionFields...)
+               if err != nil {
+                       return nil, err
+               }
+               s = s.ProjFields(projFieldRefs...)
+       }
+       limit := ud.originalQuery.GetLimit()
+       if limit == 0 {
+               limit = defaultLimit
        }
        temp := &measurev1.QueryRequest{
                TagProjection:   ud.originalQuery.TagProjection,
                FieldProjection: ud.originalQuery.FieldProjection,
                Metadata:        ud.originalQuery.Metadata,
                Criteria:        ud.originalQuery.Criteria,
-               Limit:           uint32(ud.maxDataPointsSize),
-               OrderBy: &modelv1.QueryOrder{
-                       IndexRuleName: ud.order.Index.Metadata.Name,
-                       Sort:          ud.order.Sort,
-               },
+               Limit:           limit,
+               OrderBy:         ud.originalQuery.OrderBy,
        }
-       if ud.order == nil {
+       if ud.originalQuery.OrderBy == nil {
                return &distributedPlan{
                        queryTemplate: temp,
                        s:             s,
                        sortByTime:    true,
                }, nil
        }
+       if ud.originalQuery.OrderBy.IndexRuleName == "" {
+               result := &distributedPlan{
+                       queryTemplate: temp,
+                       s:             s,
+                       sortByTime:    true,
+               }
+               if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+                       result.desc = true
+               }
+               return result, nil
+       }
        ok, indexRule := 
s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName)
        if !ok {
                return nil, fmt.Errorf("index rule %s not found", 
ud.originalQuery.OrderBy.IndexRuleName)
@@ -106,17 +126,21 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
 }
 
 type distributedPlan struct {
-       s             logical.Schema
-       queryTemplate *measurev1.QueryRequest
-       sortTagSpec   logical.TagSpec
-       sortByTime    bool
-       desc          bool
+       s                 logical.Schema
+       queryTemplate     *measurev1.QueryRequest
+       sortTagSpec       logical.TagSpec
+       sortByTime        bool
+       desc              bool
+       maxDataPointsSize uint32
 }
 
 func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, 
error) {
        dctx := executor.FromDistributedExecutionContext(ctx)
        query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest)
        query.TimeRange = dctx.TimeRange()
+       if t.maxDataPointsSize > 0 {
+               query.Limit = t.maxDataPointsSize
+       }
        ff, err := dctx.Broadcast(data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
        if err != nil {
                return nil, err
@@ -127,8 +151,12 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(executor.MIterator, erro
                if m, getErr := f.Get(); getErr != nil {
                        allErr = multierr.Append(allErr, getErr)
                } else {
+                       d := m.Data()
+                       if d == nil {
+                               continue
+                       }
                        see = append(see,
-                               
newSortableElements(m.Data().(*measurev1.QueryResponse).DataPoints,
+                               
newSortableElements(d.(*measurev1.QueryResponse).DataPoints,
                                        t.sortByTime, t.sortTagSpec))
                }
        }
@@ -149,6 +177,10 @@ func (t *distributedPlan) Schema() logical.Schema {
        return t.s
 }
 
+func (t *distributedPlan) Limit(max int) {
+       t.maxDataPointsSize = uint32(max)
+}
+
 var _ sort.Comparable = (*comparableDataPoint)(nil)
 
 type comparableDataPoint struct {
@@ -220,7 +252,7 @@ func (s *sortableElements) iter(fn 
func(*measurev1.DataPoint) (*comparableDataPo
                return s.iter(fn)
        }
        s.cur = cur
-       return s.index < len(s.dataPoints)
+       return s.index <= len(s.dataPoints)
 }
 
 var _ executor.MIterator = (*sortedMIterator)(nil)
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index 7fce94cc..4e111ad8 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -51,6 +51,15 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
        if ud.originalQuery.Projection == nil {
                return nil, fmt.Errorf("projection is required")
        }
+       projectionTags := logical.ToTags(ud.originalQuery.GetProjection())
+       if len(projectionTags) > 0 {
+               var err error
+               projTagsRefs, err := s.CreateTagRef(projectionTags...)
+               if err != nil {
+                       return nil, err
+               }
+               s = s.ProjTags(projTagsRefs...)
+       }
        limit := ud.originalQuery.GetLimit()
        if limit == 0 {
                limit = defaultLimit
@@ -69,6 +78,17 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                        sortByTime:    true,
                }, nil
        }
+       if ud.originalQuery.OrderBy.IndexRuleName == "" {
+               result := &distributedPlan{
+                       queryTemplate: temp,
+                       s:             s,
+                       sortByTime:    true,
+               }
+               if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+                       result.desc = true
+               }
+               return result, nil
+       }
        ok, indexRule := 
s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName)
        if !ok {
                return nil, fmt.Errorf("index rule %s not found", 
ud.originalQuery.OrderBy.IndexRuleName)
@@ -93,18 +113,22 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
 }
 
 type distributedPlan struct {
-       s             logical.Schema
-       queryTemplate *streamv1.QueryRequest
-       sortTagSpec   logical.TagSpec
-       sortByTime    bool
-       desc          bool
+       s              logical.Schema
+       queryTemplate  *streamv1.QueryRequest
+       sortTagSpec    logical.TagSpec
+       sortByTime     bool
+       desc           bool
+       maxElementSize uint32
 }
 
 func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
        dctx := executor.FromDistributedExecutionContext(ctx)
        query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
        query.TimeRange = dctx.TimeRange()
-       ff, err := dctx.Broadcast(data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       if t.maxElementSize > 0 {
+               query.Limit = t.maxElementSize
+       }
+       ff, err := dctx.Broadcast(data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
        if err != nil {
                return nil, err
        }
@@ -114,9 +138,17 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
                if m, getErr := f.Get(); getErr != nil {
                        allErr = multierr.Append(allErr, getErr)
                } else {
+                       d := m.Data()
+                       if d == nil {
+                               continue
+                       }
+                       resp := d.(*streamv1.QueryResponse)
+                       if err != nil {
+                               allErr = multierr.Append(allErr, err)
+                               continue
+                       }
                        see = append(see,
-                               
newSortableElements(m.Data().(*streamv1.QueryResponse).Elements,
-                                       t.sortByTime, t.sortTagSpec))
+                               newSortableElements(resp.Elements, 
t.sortByTime, t.sortTagSpec))
                }
        }
        iter := sort.NewItemIter[*comparableElement](see, t.desc)
@@ -139,6 +171,10 @@ func (t *distributedPlan) Schema() logical.Schema {
        return t.s
 }
 
+func (t *distributedPlan) Limit(max int) {
+       t.maxElementSize = uint32(max)
+}
+
 var _ sort.Comparable = (*comparableElement)(nil)
 
 type comparableElement struct {
@@ -210,5 +246,5 @@ func (s *sortableElements) iter(fn func(*streamv1.Element) 
(*comparableElement,
                return s.iter(fn)
        }
        s.cur = cur
-       return s.index < len(s.elements)
+       return s.index <= len(s.elements)
 }
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index ce96cf39..049f5d29 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -253,7 +253,7 @@ func (sr *schemaRepo) createGroup(name string) (g *group) {
        if sr.resourceSupplier != nil {
                g = newGroup(sr.metadata, sr.l, sr.resourceSupplier)
        } else {
-               g = newPortableGroup(sr.metadata, sr.l)
+               g = newPortableGroup(sr.metadata, sr.l, 
sr.resourceSchemaSupplier)
        }
        sr.data[name] = g
        return
@@ -354,13 +354,14 @@ func (sr *schemaRepo) Close() {
 var _ Group = (*group)(nil)
 
 type group struct {
-       resourceSupplier ResourceSupplier
-       metadata         metadata.Repo
-       db               atomic.Value
-       groupSchema      atomic.Pointer[commonv1.Group]
-       l                *logger.Logger
-       schemaMap        map[string]*resourceSpec
-       mapMutex         sync.RWMutex
+       resourceSupplier       ResourceSupplier
+       resourceSchemaSupplier ResourceSchemaSupplier
+       metadata               metadata.Repo
+       db                     atomic.Value
+       groupSchema            atomic.Pointer[commonv1.Group]
+       l                      *logger.Logger
+       schemaMap              map[string]*resourceSpec
+       mapMutex               sync.RWMutex
 }
 
 func newGroup(
@@ -369,11 +370,12 @@ func newGroup(
        resourceSupplier ResourceSupplier,
 ) *group {
        g := &group{
-               groupSchema:      atomic.Pointer[commonv1.Group]{},
-               metadata:         metadata,
-               l:                l,
-               schemaMap:        make(map[string]*resourceSpec),
-               resourceSupplier: resourceSupplier,
+               groupSchema:            atomic.Pointer[commonv1.Group]{},
+               metadata:               metadata,
+               l:                      l,
+               schemaMap:              make(map[string]*resourceSpec),
+               resourceSupplier:       resourceSupplier,
+               resourceSchemaSupplier: resourceSupplier,
        }
        return g
 }
@@ -381,12 +383,14 @@ func newGroup(
 func newPortableGroup(
        metadata metadata.Repo,
        l *logger.Logger,
+       resourceSchemaSupplier ResourceSchemaSupplier,
 ) *group {
        g := &group{
-               groupSchema: atomic.Pointer[commonv1.Group]{},
-               metadata:    metadata,
-               l:           l,
-               schemaMap:   make(map[string]*resourceSpec),
+               groupSchema:            atomic.Pointer[commonv1.Group]{},
+               metadata:               metadata,
+               l:                      l,
+               schemaMap:              make(map[string]*resourceSpec),
+               resourceSchemaSupplier: resourceSchemaSupplier,
        }
        return g
 }
@@ -454,13 +458,15 @@ func (g *group) storeResource(ctx context.Context, 
resourceSchema ResourceSchema
        if preResource != nil && preResource.isNewThan(resource) {
                return preResource, nil
        }
+       var dbSupplier tsdb.Supplier
        if !g.isPortable() {
-               sm, errTS := 
g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, 
resource)
-               if errTS != nil {
-                       return nil, errTS
-               }
-               resource.delegated = sm
+               dbSupplier = g
+       }
+       sm, errTS := 
g.resourceSchemaSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, 
dbSupplier, resource)
+       if errTS != nil {
+               return nil, errTS
        }
+       resource.delegated = sm
        g.schemaMap[key] = resource
        if preResource != nil {
                _ = preResource.Close()
@@ -501,7 +507,7 @@ func (g *group) close() (err error) {
                err = multierr.Append(err, s.Close())
        }
        g.mapMutex.RUnlock()
-       if !g.isInit() {
+       if !g.isInit() || g.isPortable() {
                return nil
        }
        return multierr.Append(err, g.SupplyTSDB().Close())
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index f19a4230..65880102 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -75,12 +75,12 @@ type Resource interface {
 // ResourceSchemaSupplier allows get a ResourceSchema from the metadata.
 type ResourceSchemaSupplier interface {
        ResourceSchema(metadata *commonv1.Metadata) (ResourceSchema, error)
+       OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource) 
(io.Closer, error)
 }
 
 // ResourceSupplier allows open a resource and its embedded tsdb.
 type ResourceSupplier interface {
        ResourceSchemaSupplier
-       OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource) 
(io.Closer, error)
        OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error)
 }
 
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index f5af458e..b3159f90 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -29,6 +29,7 @@ import (
        "google.golang.org/grpc/credentials"
        "google.golang.org/grpc/credentials/insecure"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/cmdsetup"
@@ -176,3 +177,54 @@ func CMD(flags ...string) func() {
                wg.Wait()
        }
 }
+
+// DataNode runs a data node.
+func DataNode(etcdEndpoint string) func() {
+       path, deferFn, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       ports, err := test.AllocateFreePorts(1)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       addr := fmt.Sprintf("%s:%d", host, ports[0])
+       nodeHost := "127.0.0.1"
+       closeFn := CMD("data",
+               "--grpc-host="+host,
+               fmt.Sprintf("--grpc-port=%d", ports[0]),
+               "--stream-root-path="+path,
+               "--measure-root-path="+path,
+               "--etcd-endpoints", etcdEndpoint,
+               "--node-host-provider", "flag",
+               "--node-host", nodeHost)
+       gomega.Eventually(
+               helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, 
grpclib.WithTransportCredentials(insecure.NewCredentials())),
+               testflags.EventuallyTimeout).Should(gomega.Succeed())
+       gomega.Eventually(func() (map[string]*databasev1.Node, error) {
+               return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
+       }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
+       return func() {
+               closeFn()
+               deferFn()
+       }
+}
+
+// LiaisonNode runs a liaison node.
+func LiaisonNode(etcdEndpoint string) (string, func()) {
+       ports, err := test.AllocateFreePorts(2)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       addr := fmt.Sprintf("%s:%d", host, ports[0])
+       httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
+       nodeHost := "127.0.0.1"
+       closeFn := CMD("liaison",
+               "--grpc-host="+host,
+               fmt.Sprintf("--grpc-port=%d", ports[0]),
+               "--http-host="+host,
+               fmt.Sprintf("--http-port=%d", ports[1]),
+               "--http-grpc-addr="+addr,
+               "--etcd-endpoints", etcdEndpoint,
+               "--node-host-provider", "flag",
+               "--node-host", nodeHost)
+       gomega.Eventually(helpers.HTTPHealthCheck(httpAddr), 
testflags.EventuallyTimeout).Should(gomega.Succeed())
+       gomega.Eventually(func() (map[string]*databasev1.Node, error) {
+               return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
+       }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
+       return addr, closeFn
+}
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/cases/init.go
similarity index 56%
copy from test/integration/standalone/query/query_suite_test.go
copy to test/cases/init.go
index 91b47189..72f66315 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/cases/init.go
@@ -15,56 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package integration_query_test
+// Package cases provides some tools to access test data.
+package cases
 
 import (
-       "testing"
        "time"
 
-       . "github.com/onsi/ginkgo/v2"
-       . "github.com/onsi/gomega"
-       "github.com/onsi/gomega/gleak"
+       "github.com/onsi/gomega"
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/test/flags"
-       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
-       "github.com/apache/skywalking-banyandb/pkg/test/setup"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
-       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
        casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
-       casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
-       casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
-       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
 
-func TestIntegrationQuery(t *testing.T) {
-       RegisterFailHandler(Fail)
-       RunSpecs(t, "Integration Query Suite", 
Label(integration_standalone.Labels...))
-}
-
-var (
-       connection *grpc.ClientConn
-       now        time.Time
-       deferFunc  func()
-       goods      []gleak.Goroutine
-)
-
-var _ = SynchronizedBeforeSuite(func() []byte {
-       goods = gleak.Goroutines()
-       Expect(logger.Init(logger.Logging{
-               Env:   "dev",
-               Level: flags.LogLevel,
-       })).To(Succeed())
-       var addr string
-       addr, _, deferFunc = setup.Standalone()
+// Initialize test data.
+func Initialize(addr string, now time.Time) {
        conn, err := grpchelper.Conn(addr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
-       Expect(err).NotTo(HaveOccurred())
-       ns := timestamp.NowMilli().UnixNano()
-       now = time.Unix(0, ns-ns%int64(time.Minute))
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       defer conn.Close()
        interval := 500 * time.Millisecond
        // stream
        casesstreamdata.Write(conn, "data.json", now, interval)
@@ -80,32 +50,4 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json", 
now.Add(10*time.Second), interval)
        casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json", 
now.Add(10*time.Minute), interval)
-       Expect(conn.Close()).To(Succeed())
-       return []byte(addr)
-}, func(address []byte) {
-       var err error
-       connection, err = grpchelper.Conn(string(address), 10*time.Second,
-               grpc.WithTransportCredentials(insecure.NewCredentials()))
-       casesstream.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               BaseTime:   now,
-       }
-       casesmeasure.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               BaseTime:   now,
-       }
-       casestopn.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               BaseTime:   now,
-       }
-       Expect(err).NotTo(HaveOccurred())
-})
-
-var _ = SynchronizedAfterSuite(func() {
-       if connection != nil {
-               Expect(connection.Close()).To(Succeed())
-       }
-}, func() {
-       deferFunc()
-       Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
-})
+}
diff --git a/test/cases/measure/data/input/order_tag_asc.yaml 
b/test/cases/measure/data/input/order_tag_asc.yaml
index c5b1f1bd..a752c72b 100644
--- a/test/cases/measure/data/input/order_tag_asc.yaml
+++ b/test/cases/measure/data/input/order_tag_asc.yaml
@@ -22,8 +22,6 @@ tagProjection:
   tagFamilies:
   - name: "default"
     tags: ["id"]
-fieldProjection:
-  names: ["total", "value"]
 orderBy:
   sort: "SORT_ASC"
   indexRuleName: "id"
diff --git a/test/cases/measure/data/input/order_tag_desc.yaml 
b/test/cases/measure/data/input/order_tag_desc.yaml
index 6031cdb3..c5007d75 100644
--- a/test/cases/measure/data/input/order_tag_desc.yaml
+++ b/test/cases/measure/data/input/order_tag_desc.yaml
@@ -22,8 +22,6 @@ tagProjection:
   tagFamilies:
   - name: "default"
     tags: ["id"]
-fieldProjection:
-  names: ["total", "value"]
 orderBy:
   sort: "SORT_DESC"
   indexRuleName: "id"
diff --git a/test/cases/measure/data/want/order_tag_asc.yaml 
b/test/cases/measure/data/want/order_tag_asc.yaml
index b6a59101..123e2bda 100644
--- a/test/cases/measure/data/want/order_tag_asc.yaml
+++ b/test/cases/measure/data/want/order_tag_asc.yaml
@@ -16,105 +16,51 @@
 # under the License.
 
 dataPoints:
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "1"
-  tagFamilies:
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc1
-  timestamp: "2023-06-26T12:41:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "2"
-  tagFamilies:
+  timestamp: "2023-09-19T08:35:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc1
-  timestamp: "2023-06-26T12:42:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "3"
-  tagFamilies:
+  timestamp: "2023-09-19T08:37:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc1
-  timestamp: "2023-06-26T12:43:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "5"
-  tagFamilies:
+  timestamp: "2023-09-19T08:36:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc2
-  timestamp: "2023-06-26T12:44:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "50"
-  - name: value
-    value:
-      int:
-        value: "4"
-  tagFamilies:
+  timestamp: "2023-09-19T08:38:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc2
-  timestamp: "2023-06-26T12:45:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "300"
-  - name: value
-    value:
-      int:
-        value: "6"
-  tagFamilies:
+  timestamp: "2023-09-19T08:39:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc3
-  timestamp: "2023-06-26T12:46:00Z"
+  timestamp: "2023-09-19T08:40:00Z"
diff --git a/test/cases/measure/data/want/order_tag_desc.yaml 
b/test/cases/measure/data/want/order_tag_desc.yaml
index db711266..6e455083 100644
--- a/test/cases/measure/data/want/order_tag_desc.yaml
+++ b/test/cases/measure/data/want/order_tag_desc.yaml
@@ -16,105 +16,51 @@
 # under the License.
 
 dataPoints:
-- fields:
-  - name: total
-    value:
-      int:
-        value: "300"
-  - name: value
-    value:
-      int:
-        value: "6"
-  tagFamilies:
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc3
-  timestamp: "2023-06-26T12:47:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "50"
-  - name: value
-    value:
-      int:
-        value: "4"
-  tagFamilies:
+  timestamp: "2023-09-19T08:42:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc2
-  timestamp: "2023-06-26T12:46:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "5"
-  tagFamilies:
+  timestamp: "2023-09-19T08:41:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc2
-  timestamp: "2023-06-26T12:45:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "3"
-  tagFamilies:
+  timestamp: "2023-09-19T08:40:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc1
-  timestamp: "2023-06-26T12:44:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "2"
-  tagFamilies:
+  timestamp: "2023-09-19T08:38:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc1
-  timestamp: "2023-06-26T12:43:00Z"
-- fields:
-  - name: total
-    value:
-      int:
-        value: "100"
-  - name: value
-    value:
-      int:
-        value: "1"
-  tagFamilies:
+  timestamp: "2023-09-19T08:39:00Z"
+- tagFamilies:
   - name: default
     tags:
     - key: id
       value:
         str:
           value: svc1
-  timestamp: "2023-06-26T12:42:00Z"
+  timestamp: "2023-09-19T08:37:00Z"
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/integration/distributed/query/query_suite_test.go
similarity index 55%
copy from test/integration/standalone/query/query_suite_test.go
copy to test/integration/distributed/query/query_suite_test.go
index 91b47189..0c78a14f 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package integration_query_test
+// Package integration_setup_test is a integration test suite.
+package integration_setup_test
 
 import (
+       "context"
+       "fmt"
        "testing"
        "time"
 
@@ -27,61 +30,82 @@ import (
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
 
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
-       casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
-       casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
-       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
 
-func TestIntegrationQuery(t *testing.T) {
+func TestQuery(t *testing.T) {
        RegisterFailHandler(Fail)
-       RunSpecs(t, "Integration Query Suite", 
Label(integration_standalone.Labels...))
+       RunSpecs(t, "Distributed Query Suite")
 }
 
 var (
-       connection *grpc.ClientConn
-       now        time.Time
        deferFunc  func()
        goods      []gleak.Goroutine
+       now        time.Time
+       connection *grpc.ClientConn
 )
 
 var _ = SynchronizedBeforeSuite(func() []byte {
-       goods = gleak.Goroutines()
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
                Level: flags.LogLevel,
        })).To(Succeed())
-       var addr string
-       addr, _, deferFunc = setup.Standalone()
-       conn, err := grpchelper.Conn(addr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       goods = gleak.Goroutines()
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
        Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir))
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+       By("Loading schema")
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+       ctx := context.Background()
+       test_stream.PreloadSchema(ctx, schemaRegistry)
+       test_measure.PreloadSchema(ctx, schemaRegistry)
+       By("Starting data node 0")
+       closeDataNode0 := setup.DataNode(ep)
+       By("Starting data node 1")
+       closeDataNode1 := setup.DataNode(ep)
+       By("Starting liaison node")
+       liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+       By("Initializing test cases")
        ns := timestamp.NowMilli().UnixNano()
        now = time.Unix(0, ns-ns%int64(time.Minute))
-       interval := 500 * time.Millisecond
-       // stream
-       casesstreamdata.Write(conn, "data.json", now, interval)
-       // measure
-       interval = time.Minute
-       casesmeasuredata.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", 
"instance_clr_cpu_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), 
interval)
-       casesmeasuredata.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), 
interval)
-       casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json", 
now.Add(10*time.Second), interval)
-       casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json", 
now.Add(10*time.Minute), interval)
-       Expect(conn.Close()).To(Succeed())
-       return []byte(addr)
+       test_cases.Initialize(liaisonAddr, now)
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               closeDataNode1()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+       return []byte(liaisonAddr)
 }, func(address []byte) {
        var err error
        connection, err = grpchelper.Conn(string(address), 10*time.Second,
diff --git a/test/integration/standalone/cold_query/query_suite_test.go 
b/test/integration/standalone/cold_query/query_suite_test.go
index 7c86e67e..18e832ad 100644
--- a/test/integration/standalone/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -33,10 +33,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
-       casesmeasureData 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
-       casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
        integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
@@ -61,24 +60,9 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        })).To(Succeed())
        var addr string
        addr, _, deferFunc = setup.Standalone()
-       conn, err := grpchelper.Conn(addr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
-       Expect(err).NotTo(HaveOccurred())
        ns := timestamp.NowMilli().UnixNano()
        now = time.Unix(0, ns-ns%int64(time.Minute)).Add(-time.Hour * 24)
-       interval := 500 * time.Millisecond
-       casesstreamdata.Write(conn, "data.json", now, interval)
-       interval = time.Minute
-       casesmeasureData.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
-       casesmeasureData.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
-       casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
-       casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric", 
"instance_clr_cpu_minute_data.json", now, interval)
-       casesmeasureData.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
-       casesmeasureData.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), 
interval)
-       casesmeasureData.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), 
interval)
-       casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
-       casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json", 
now.Add(10*time.Second), interval)
-       casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json", 
now.Add(10*time.Minute), interval)
-       Expect(conn.Close()).To(Succeed())
+       test_cases.Initialize(addr, now)
        return []byte(addr)
 }, func(address []byte) {
        var err error
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/integration/standalone/query/query_suite_test.go
index 91b47189..a1e684b2 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -33,10 +33,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
-       casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
-       casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
        integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
@@ -61,26 +60,9 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        })).To(Succeed())
        var addr string
        addr, _, deferFunc = setup.Standalone()
-       conn, err := grpchelper.Conn(addr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
-       Expect(err).NotTo(HaveOccurred())
        ns := timestamp.NowMilli().UnixNano()
        now = time.Unix(0, ns-ns%int64(time.Minute))
-       interval := 500 * time.Millisecond
-       // stream
-       casesstreamdata.Write(conn, "data.json", now, interval)
-       // measure
-       interval = time.Minute
-       casesmeasuredata.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", 
"instance_clr_cpu_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), 
interval)
-       casesmeasuredata.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), 
interval)
-       casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
-       casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data1.json", 
now.Add(10*time.Second), interval)
-       casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", 
"sw_metric", "service_instance_endpoint_cpm_minute_data2.json", 
now.Add(10*time.Minute), interval)
-       Expect(conn.Close()).To(Succeed())
+       test_cases.Initialize(addr, now)
        return []byte(addr)
 }, func(address []byte) {
        var err error

Reply via email to