This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 32524d2  feat: Implement Write in the liaison (#30)
32524d2 is described below

commit 32524d2fbd3e42d6d7693bb78f1d1236fda38e84
Author: Fine0830 <[email protected]>
AuthorDate: Wed Aug 11 21:05:28 2021 +0800

    feat: Implement Write in the liaison (#30)
    
    * feat: implement Write and Query
    
    * feat: add test server
    
    * feat: get field subscripts
    
    * feat: generate shardID
    
    * fix: compute shardID
    
    * fix: update test data
    
    * update
    
    * feat: use protobuf
    
    * fix: update incorrect syntax
    
    * Fix service register failures
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * feat: update grpc
    
    * feat: publish and subscribe write data
    
    * feat: add series id
    
    * fix: write data
    
    * feat: add pipeline in series
    
    * feat: write trace data
    
    * feat: use pipeline
    
    * fix: update custom event
    
    * fix: update repo
    
    * fix: grpc test
    
    * fix: add param
    
    * fix: grpc test
    
    * fix: lint
    
    * fix: update header ignore files
    
    * fix: use embed to access to files
    
    * fix: update header ignore files
    
    * fix: format code
    
    * refactor: address pr
    
    * revert: flags
    
    * fix: update schemaMap in writeListener
    
    * fix: use assert in tests
    
    * fix: add sync.RWMutex to protect seriesEvent and shardEvent
    
    * fix: update server opts
    
    * fix: remove dead code
    
    * fix: update header ignore files
    
    * fix: set maxRecMsgSize flag
    
    * fix: typo
    
    * fix: address pr
    
    * fix: update flags
    
    * fix: add tls tests
    
    * fix: update test
    
    * fix: lint
    
    * revert version
    
    * refactor: update flags
    
    * fix: add sync.RWMutex to protect events
    
    * fix: lint
    
    * feat: separate shardid and seriesid func
    
    * feat: add maps for events
    
    * fix: update RWMutex
    
    * fix: update test
    
    * feat: generate tls certificate
    
    * fix: update flags
    
    * fix: update grpc test
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 .licenserc.yaml                           |   3 +-
 api/data/trace.go                         |  16 ++
 api/event/discovery.go                    |   6 +-
 api/proto/banyandb/v1/write.pb.go         |   2 +-
 api/proto/banyandb/v1/write.proto         |   2 +-
 banyand/internal/cmd/standalone.go        |   2 +-
 banyand/liaison/grpc/data/server_cert.pem |  23 +++
 banyand/liaison/grpc/data/server_key.pem  |  27 +++
 banyand/liaison/grpc/grpc.go              | 262 +++++++++++++++++++++++++++---
 banyand/liaison/grpc/grpc_test.go         | 250 ++++++++++++++++++++++++++++
 banyand/query/processor_test.go           |   8 +-
 banyand/series/trace/common_test.go       |   4 +-
 banyand/series/trace/query.go             |   5 +-
 banyand/series/trace/service.go           |  57 +++++--
 banyand/series/trace/trace.go             |   6 +-
 banyand/series/trace/write.go             |   5 +-
 banyand/series/trace/write_test.go        |   6 +-
 pkg/partition/route.go                    |  13 +-
 pkg/query/logical/expr.go                 |  12 +-
 pkg/query/logical/plan_orderby.go         |   2 +-
 pkg/query/logical/schema.go               |  21 ++-
 21 files changed, 668 insertions(+), 64 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 3049c0b..3f25016 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -74,10 +74,9 @@ header: # `header` section is configurations for source 
codes license header.
     - '**/*.json'
     - '**/*_mock.go'
     - '**/*_mock_test.go'
-    - '**/*_generated.go'
-    - '**/Trace_grpc.go'
     - '**/*.pb.go'
     - '**/*.textproto'
+    - '**/*.pem'
 
   comment: on-failure # on what condition license-eye will comment on the pull 
request, `on-failure`, `always`, `never`.
 
diff --git a/api/data/trace.go b/api/data/trace.go
index 5351990..6d59308 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -20,10 +20,17 @@ package data
 import (
        "github.com/apache/skywalking-banyandb/api/common"
        v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
 var TraceKindVersion = common.KindVersion{Version: "v1", Kind: "data-trace"}
 
+var WriteEventKindVersion = common.KindVersion{
+       Version: "v1",
+       Kind:    "trace-write",
+}
+var TopicWriteEvent = bus.UniTopic(WriteEventKindVersion.String())
+
 type Trace struct {
        common.KindVersion
        Entities []Entity
@@ -36,6 +43,15 @@ type Entity struct {
 type EntityValue struct {
        *v1.EntityValue
 }
+type TraceWriteDate struct {
+       ShardID      uint
+       SeriesID     uint64
+       WriteRequest *v1.WriteRequest
+}
+type Write struct {
+       common.KindVersion
+       Payload *TraceWriteDate
+}
 
 func NewTrace() *Trace {
        return &Trace{KindVersion: TraceKindVersion}
diff --git a/api/event/discovery.go b/api/event/discovery.go
index 10476b8..7b55de8 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -28,12 +28,14 @@ var (
                Version: "v1",
                Kind:    "event-shard",
        }
-       TopicShardEvent        = bus.UniTopic(ShardEventKindVersion.String())
+       TopicShardEvent = bus.UniTopic(ShardEventKindVersion.String())
+
        SeriesEventKindVersion = common.KindVersion{
                Version: "v1",
                Kind:    "event-series",
        }
-       TopicSeriesEvent     = bus.UniTopic(SeriesEventKindVersion.String())
+       TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+
        IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: 
"index-rule"}
        TopicIndexRule       = bus.UniTopic(IndexRuleKindVersion.String())
 )
diff --git a/api/proto/banyandb/v1/write.pb.go 
b/api/proto/banyandb/v1/write.pb.go
index 6808076..c2c1026 100644
--- a/api/proto/banyandb/v1/write.pb.go
+++ b/api/proto/banyandb/v1/write.pb.go
@@ -364,7 +364,7 @@ type EntityValue struct {
        // binary representation of segments, including tags, spans...
        DataBinary []byte 
`protobuf:"bytes,3,opt,name=data_binary,json=dataBinary,proto3" 
json:"data_binary,omitempty"`
        // support all of indexed fields in the fields.
-       // Pair only has value, as the value of PairValue match with the key
+       // Field only has value, as the value of value_type match with the key
        // by the index rules and index rule bindings of Metadata group.
        // indexed fields of multiple entities are compression in the fields.
        Fields []*Field `protobuf:"bytes,4,rep,name=fields,proto3" 
json:"fields,omitempty"`
diff --git a/api/proto/banyandb/v1/write.proto 
b/api/proto/banyandb/v1/write.proto
index 3d7ddc0..02e2d1a 100644
--- a/api/proto/banyandb/v1/write.proto
+++ b/api/proto/banyandb/v1/write.proto
@@ -62,7 +62,7 @@ message EntityValue {
   // binary representation of segments, including tags, spans...
   bytes data_binary = 3;
   // support all of indexed fields in the fields.
-  // Pair only has value, as the value of PairValue match with the key
+  // Field only has value, as the value of value_type match with the key
   // by the index rules and index rule bindings of Metadata group.
   // indexed fields of multiple entities are compression in the fields.
   repeated Field fields = 4;
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index d694895..1aa0b4f 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -61,7 +61,7 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate index builder")
        }
-       traceSeries, err := trace.NewService(ctx, db, repo, idx)
+       traceSeries, err := trace.NewService(ctx, db, repo, idx, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate trace series")
        }
diff --git a/banyand/liaison/grpc/data/server_cert.pem 
b/banyand/liaison/grpc/data/server_cert.pem
new file mode 100644
index 0000000..d7e693f
--- /dev/null
+++ b/banyand/liaison/grpc/data/server_cert.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIIDyzCCArOgAwIBAgIJAMtUjj8gb0gnMA0GCSqGSIb3DQEBCwUAMIGIMQswCQYD
+VQQGEwJDTjELMAkGA1UECAwCU0MxCzAJBgNVBAcMAkNEMRMwEQYDVQQKDApza3l3
+YWxraW5nMREwDwYDVQQLDAhiYW55YW5kYjEjMCEGCSqGSIb3DQEJARYUZmFueHVl
+MDgzMEBlbWFpbC5jb20xEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yMTA4MTEwNjU0
+MjNaFw0yNjA4MTAwNjU0MjNaMIGIMQswCQYDVQQGEwJDTjELMAkGA1UECAwCU0Mx
+CzAJBgNVBAcMAkNEMRMwEQYDVQQKDApza3l3YWxraW5nMREwDwYDVQQLDAhiYW55
+YW5kYjEjMCEGCSqGSIb3DQEJARYUZmFueHVlMDgzMEBlbWFpbC5jb20xEjAQBgNV
+BAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMwN
+2N/JCT64EMySDMqwA4hJ2lDY6ro1BMgnfjFXZNg2dZagMYRwNao+D6HSl1y9Apr4
+EGC46hGuL92zrAUHEeMNZrEWBEB16UrnbyQeT4d1/Lq9tywGG0j2uvjCO5zMN43e
+WX8o7c9gZOPPQp/MKFFDZtxbj5+Ve2LWBdx1VWkIvJzITlLbmnASjBXzWs1kPrJY
+GiSrAASyb9nKNn5Lvw3bdNgOGLBQmd6jRKOkKHAEvFCHSKpQV4R2lEWAOb1NSfYr
+/YFz+GGc+HjZeSZt/4nIpMa6+jOs/2LQybz0jVtAprBE8lVRMSn0L+Ud9Gj7oJcJ
+hOJCoCTG+0vfGl0P3I0CAwEAAaM2MDQwMgYDVR0RBCswKYIJbG9jYWxob3N0ggtl
+eGFtcGxlLmNvbYIPd3d3LmV4YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA4IBAQCJ
+L7mG1mjG+9zg5Hl4TbakQ/zxnz3VuAUN+8WEVcTY65Tc4LbabD56mf5PumDQaGzz
+mFhblsi2mXk4H+dbUCh3wEgpcbmCjZKa5Kuyypfcg8JgFzIS3PwNp/BCzxdP2uqA
+Lw1lHYczyWIaW+M7tVG4V9ewpo/DOX5xlkMw0kDJatYxCJ8CzSzFropNIeINkEX1
+dT/vDtH0qwWNUDGf5yr8dnjowKCRczmc1FY7hV+Q0SBZKhqHkTKrxgjT7eVNG5CR
+5xajFa96ut8UXTFd9+TXJNyPaYVotMD4WcLShc31yfwGR+Y3YpmH8uHzN3NRNwPJ
+wcIM5AFYEIf0ing6eIws
+-----END CERTIFICATE-----
diff --git a/banyand/liaison/grpc/data/server_key.pem 
b/banyand/liaison/grpc/data/server_key.pem
new file mode 100644
index 0000000..fc80ebe
--- /dev/null
+++ b/banyand/liaison/grpc/data/server_key.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAzA3Y38kJPrgQzJIMyrADiEnaUNjqujUEyCd+MVdk2DZ1lqAx
+hHA1qj4PodKXXL0CmvgQYLjqEa4v3bOsBQcR4w1msRYEQHXpSudvJB5Ph3X8ur23
+LAYbSPa6+MI7nMw3jd5Zfyjtz2Bk489Cn8woUUNm3FuPn5V7YtYF3HVVaQi8nMhO
+UtuacBKMFfNazWQ+slgaJKsABLJv2co2fku/Ddt02A4YsFCZ3qNEo6QocAS8UIdI
+qlBXhHaURYA5vU1J9iv9gXP4YZz4eNl5Jm3/icikxrr6M6z/YtDJvPSNW0CmsETy
+VVExKfQv5R30aPuglwmE4kKgJMb7S98aXQ/cjQIDAQABAoIBAQCL4VowXmHmCswJ
+UH1QXMSvIuFz1p9iMoIqq1gIfv50cTC+puYLAdjn8U9KAVEdk7w7e53OkDR1FlFd
+y5M6hxQt77vb3VngznO0k15PBjBCjhFH+lGc7jq6E9ksOgoffKcAq9HyJ56OMGg4
++pWTcaKZwni6ylF0dkZ1BH5UGGMKcmX+E6NW13tUgU2fRGttjGFvUbHflCNg2vJh
+3IEFUmDGoDvJt4tdK2C1sBv7+cbW1+XBnPa9Gja2MF+v6DIs6AIpnHKwg33Bk33Q
+FX9oNErRYdtT3A9dcvR0qP6uVyVBPRREiuyBFJ0LUGXsbWwIjoPQ8SBZv1LaB1sW
+lo37BX/JAoGBAPuFJ07YNxkCniItKybPzT8mKKUPWS5iJy5pe5hBYSuIKSXZXhBl
+ETwhO/zpilwN8khXWokSW6QU/JmLQnHyG9ox3o5B/eAhCVCBZMe7eJaHJhXifj2A
+1i6KzAePIykUi4i5XKy5XSgRjyfO7E3ofW7MyDvg/OLWYX66MMcDVAf/AoGBAM+w
+Q7twak11okLckJKdtkasy7g5bHhKvksnw2G/CKave1c/CULPnSYYkiffiinUvVoP
+eUM8ANnFY36p1Lz+tE7g7VN1LAzN10ZwCPEdWZ0sjCveasiQrpNCXNWl5IRyRkQT
+m70Cr78zX5ahMIrU0ArLVRLVFB2ZX3lpnOK7h7tzAoGAIPKjYI+wQAV4w49ZLL9h
+6pjMEDs/enT/HvRQbXR7DyHKChw8Vzd2F4NfAVVye3aUO2e+A2C1QnxBTrfQX27Q
+uTd5KPd6E0cgmjwpAIUNWeKgWZOO5+2doQErkv3sJDB9ys5FVpb9ngcW0qcni1ke
+PUp0HGvvlKNyqBAp3ZgRBO8CgYBdEwcnpxdco00WXbZEnn0jayjY5JMhzY0+LRG4
+al48JQRHcy55TIWGnxhQ2jMW0AoTpD+Zy/gtn/IYv49hK1wuxUpWTnpxOoYxQOAg
+/iA8+cvPlRuRypUR1Xm5HWEtofCvbYIr0Fpme2VpIc+ZSAn77Gexyt/669MHnDb8
+vUH01QKBgQC6f88E/9nOb+Xwf2ZXSpK3BQQixPdi+wRY+smqYomSH+Vvou/ny8Cu
+AExWLc1KIH39tvs3e7RarB+hVDTGoHDD86basJ2eaaZdJfGUU0GGA39yRayn9xWz
+3eGVnQLmbguB9R4XYyF5lkp6qzopqp0eGNYMSxtvIhIYroJfOkb6vA==
+-----END RSA PRIVATE KEY-----
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 91a75d7..39451c5 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,64 +19,142 @@ package grpc
 
 import (
        "context"
+       "fmt"
+       "io"
+       "log"
        "net"
+       "path/filepath"
+       "runtime"
+       "strings"
+       "sync"
+       "time"
 
-       "google.golang.org/grpc"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
 
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
        "github.com/apache/skywalking-banyandb/api/event"
        v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       apischema "github.com/apache/skywalking-banyandb/api/schema"
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/partition"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+var (
+       ErrSeriesEvents    = errors.New("no seriesEvent")
+       ErrShardEvents     = errors.New("no shardEvent")
+       ErrInvalidSeriesID = errors.New("invalid seriesID")
+       ErrServerCert      = errors.New("invalid server cert file")
+       ErrServerKey       = errors.New("invalid server key file")
+       ErrNoAddr          = errors.New("no address")
+)
+
 type Server struct {
-       addr       string
-       log        *logger.Logger
-       ser        *grpc.Server
-       pipeline   queue.Queue
-       repo       discovery.ServiceRepo
-       shardInfo  *shardInfo
-       seriesInfo *seriesInfo
+       addr           string
+       maxRecvMsgSize int
+       tlsVal         bool
+       certFile       string
+       keyFile        string
+       log            *logger.Logger
+       ser            *grpclib.Server
+       pipeline       queue.Queue
+       repo           discovery.ServiceRepo
+       shardInfo      *shardInfo
+       seriesInfo     *seriesInfo
+       v1.UnimplementedTraceServiceServer
 }
 
 type shardInfo struct {
-       log *logger.Logger
+       log        *logger.Logger
+       shardEvent *shardEvent
 }
 
 func (s *shardInfo) Rev(message bus.Message) (resp bus.Message) {
-       shardEvent, ok := message.Data().(*v1.ShardEvent)
+       event, ok := message.Data().(*v1.ShardEvent)
        if !ok {
                s.log.Warn().Msg("invalid event data type")
                return
        }
+       s.shardEvent.setShardEvents(event)
        s.log.Info().
-               Str("action", v1.Action_name[int32(shardEvent.Action)]).
-               Uint64("shardID", shardEvent.Shard.Id).
+               Str("action", v1.Action_name[int32(event.Action)]).
+               Uint64("shardID", event.Shard.Id).
                Msg("received a shard event")
        return
 }
 
+type shardEvent struct {
+       shardEventsMap map[string]*v1.ShardEvent
+       sync.RWMutex
+}
+
+func (s *shardEvent) setShardEvents(eventVal *v1.ShardEvent) {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       idx := eventVal.Shard.Series.GetName() + "-" + 
eventVal.Shard.Series.GetGroup()
+       if eventVal.Action == v1.Action_ACTION_PUT {
+               s.shardEventsMap[idx] = eventVal
+       } else if eventVal.Action == v1.Action_ACTION_DELETE {
+               delete(s.shardEventsMap, idx)
+       }
+}
+
+func (s *shardEvent) getShardEvent(idx string) *v1.ShardEvent {
+       s.RWMutex.RLock()
+       defer s.RWMutex.RUnlock()
+       return s.shardEventsMap[idx]
+}
+
 type seriesInfo struct {
-       log *logger.Logger
+       log         *logger.Logger
+       seriesEvent *seriesEvent
 }
 
 func (s *seriesInfo) Rev(message bus.Message) (resp bus.Message) {
-       seriesEvent, ok := message.Data().(*v1.SeriesEvent)
+       event, ok := message.Data().(*v1.SeriesEvent)
        if !ok {
                s.log.Warn().Msg("invalid event data type")
                return
        }
+       s.seriesEvent.setSeriesEvents(event)
        s.log.Info().
-               Str("action", v1.Action_name[int32(seriesEvent.Action)]).
-               Str("name", seriesEvent.Series.Name).
-               Str("group", seriesEvent.Series.Group).
+               Str("action", v1.Action_name[int32(event.Action)]).
+               Str("name", event.Series.Name).
+               Str("group", event.Series.Group).
                Msg("received a shard event")
        return
 }
 
+type seriesEvent struct {
+       seriesEventsMap map[string]*v1.SeriesEvent
+       sync.RWMutex
+}
+
+func (s *seriesEvent) setSeriesEvents(seriesEventVal *v1.SeriesEvent) {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       str := seriesEventVal.Series.GetName() + "-" + 
seriesEventVal.Series.GetGroup()
+       if seriesEventVal.Action == v1.Action_ACTION_PUT {
+               s.seriesEventsMap[str] = seriesEventVal
+       } else if seriesEventVal.Action == v1.Action_ACTION_DELETE {
+               delete(s.seriesEventsMap, str)
+       }
+}
+
+func (s *seriesEvent) getSeriesEvent(idx string) *v1.SeriesEvent {
+       s.RWMutex.RLock()
+       defer s.RWMutex.RUnlock()
+       return s.seriesEventsMap[idx]
+}
+
 func (s *Server) PreRun() error {
        s.log = logger.GetLogger("liaison-grpc")
        s.shardInfo.log = s.log
@@ -92,8 +170,8 @@ func NewServer(ctx context.Context, pipeline queue.Queue, 
repo discovery.Service
        return &Server{
                pipeline:   pipeline,
                repo:       repo,
-               shardInfo:  &shardInfo{},
-               seriesInfo: &seriesInfo{},
+               shardInfo:  &shardInfo{shardEvent: &shardEvent{shardEventsMap: 
map[string]*v1.ShardEvent{}}},
+               seriesInfo: &seriesInfo{seriesEvent: 
&seriesEvent{seriesEventsMap: map[string]*v1.SeriesEvent{}}},
        }
 }
 
@@ -102,12 +180,38 @@ func (s *Server) Name() string {
 }
 
 func (s *Server) FlagSet() *run.FlagSet {
+       size := 1024 * 1024 * 10
+       _, currentFile, _, _ := runtime.Caller(0)
+       basePath := filepath.Dir(currentFile)
+       serverCert := filepath.Join(basePath, "data/server_cert.pem")
+       serverKey := filepath.Join(basePath, "data/server_key.pem")
+
        fs := run.NewFlagSet("grpc")
-       fs.StringVarP(&s.addr, "addr", "", ":17912", "the address of banyand 
listens")
+       fs.IntVarP(&s.maxRecvMsgSize, "maxRecvMsgSize", "", size, "The size of 
max receiving message")
+       fs.BoolVarP(&s.tlsVal, "tlsVal", "", true, "Connection uses TLS if 
true, else plain TCP")
+       fs.StringVarP(&s.certFile, "certFile", "", serverCert, "The TLS cert 
file")
+       fs.StringVarP(&s.keyFile, "keyFile", "", serverKey, "The TLS key file")
+       fs.StringVarP(&s.addr, "addr", "", ":17912", "The address of banyand 
listens")
+
        return fs
 }
 
 func (s *Server) Validate() error {
+       if s.addr == "" {
+               return ErrNoAddr
+       }
+       if s.tlsVal {
+               if s.certFile == "" {
+                       return ErrServerCert
+               }
+               if s.keyFile == "" {
+                       return ErrServerKey
+               }
+               _, errTLS := credentials.NewServerTLSFromFile(s.certFile, 
s.keyFile)
+               if errTLS != nil {
+                       return errTLS
+               }
+       }
        return nil
 }
 
@@ -116,10 +220,17 @@ func (s *Server) Serve() error {
        if err != nil {
                s.log.Fatal().Err(err).Msg("Failed to listen")
        }
-
-       s.ser = grpc.NewServer()
-       // TODO: add server implementation here
-       v1.RegisterTraceServiceServer(s.ser, 
v1.UnimplementedTraceServiceServer{})
+       if errValidate := s.Validate(); errValidate != nil {
+               s.log.Fatal().Err(errValidate).Msg("Failed to validate data")
+       }
+       var opts []grpclib.ServerOption
+       if s.tlsVal {
+               creds, _ := credentials.NewServerTLSFromFile(s.certFile, 
s.keyFile)
+               opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+       }
+       opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize))
+       s.ser = grpclib.NewServer(opts...)
+       v1.RegisterTraceServiceServer(s.ser, s)
 
        return s.ser.Serve(lis)
 }
@@ -128,3 +239,106 @@ func (s *Server) GracefulStop() {
        s.log.Info().Msg("stopping")
        s.ser.GracefulStop()
 }
+
+func (s *Server) computeSeriesID(writeEntity *v1.WriteRequest, mapIndexName 
string) (SeriesID []byte, err error) {
+       ana := logical.DefaultAnalyzer()
+       metadata := common.Metadata{
+               KindVersion: apischema.SeriesKindVersion,
+               Spec:        writeEntity.GetMetadata(),
+       }
+       schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
+       if ruleError != nil {
+               return nil, ruleError
+       }
+       seriesEventVal := s.seriesInfo.seriesEvent.getSeriesEvent(mapIndexName)
+       if seriesEventVal == nil {
+               return nil, ErrSeriesEvents
+       }
+       var str string
+       var arr []string
+       fieldRefs, errField := 
schema.CreateRef(seriesEventVal.FieldNamesCompositeSeriesId...)
+       if errField != nil {
+               return nil, errField
+       }
+       for _, ref := range fieldRefs {
+               field := writeEntity.GetEntity().GetFields()[ref.Spec.Idx]
+               switch v := field.GetValueType().(type) {
+               case *v1.Field_StrArray:
+                       for j := 0; j < len(v.StrArray.Value); j++ {
+                               arr = append(arr, v.StrArray.Value[j])
+                       }
+               case *v1.Field_IntArray:
+                       for t := 0; t < len(v.IntArray.Value); t++ {
+                               arr = append(arr, 
fmt.Sprint(v.IntArray.Value[t]))
+                       }
+               case *v1.Field_Int:
+                       arr = append(arr, fmt.Sprint(v.Int.Value))
+               case *v1.Field_Str:
+                       arr = append(arr, fmt.Sprint(v.Str.Value))
+               }
+       }
+       str = strings.Join(arr, "")
+       if str == "" {
+               return nil, ErrInvalidSeriesID
+       }
+       seriesID := []byte(str)
+
+       return seriesID, nil
+}
+
+func (s *Server) computeShardID(seriesID []byte, mapIndexName string) (shardID 
uint, err error) {
+       shardEventVal := s.shardInfo.shardEvent.getShardEvent(mapIndexName)
+       if shardEventVal == nil {
+               return 0, ErrShardEvents
+       }
+       shardNum := shardEventVal.GetShard().GetId()
+       if shardNum < 1 {
+               shardNum = 1
+       }
+       shardID, shardIDError := partition.ShardID(seriesID, uint32(shardNum))
+       if shardIDError != nil {
+               return 0, shardIDError
+       }
+
+       return shardID, nil
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+       for {
+               writeEntity, err := TraceWriteServer.Recv()
+               if err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       return err
+               }
+               mapIndexName := writeEntity.GetMetadata().GetName() + "-" + 
writeEntity.GetMetadata().GetGroup()
+               seriesID, err := s.computeSeriesID(writeEntity, mapIndexName)
+               if err != nil {
+                       return err
+               }
+               shardID, err := s.computeShardID(seriesID, mapIndexName)
+               if err != nil {
+                       return err
+               }
+               mergeData := assemblyWriteData(shardID, writeEntity, 
convert.BytesToUint64(seriesID))
+               message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
mergeData)
+               _, errWritePub := s.pipeline.Publish(data.TopicWriteEvent, 
message)
+               if errWritePub != nil {
+                       return errWritePub
+               }
+               if errSend := TraceWriteServer.Send(&v1.WriteResponse{}); 
errSend != nil {
+                       return errSend
+               }
+       }
+}
+
+func (s *Server) Query(ctx context.Context, entityCriteria *v1.QueryRequest) 
(*v1.QueryResponse, error) {
+       log.Println("entityCriteria:", entityCriteria)
+
+       return &v1.QueryResponse{}, nil
+}
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID 
uint64) data.TraceWriteDate {
+       return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, 
WriteRequest: writeEntity}
+}
diff --git a/banyand/liaison/grpc/grpc_test.go 
b/banyand/liaison/grpc/grpc_test.go
new file mode 100644
index 0000000..5aea70f
--- /dev/null
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc_test
+
+import (
+       "context"
+       "io"
+       "os"
+       "path"
+       "path/filepath"
+       "runtime"
+       "testing"
+       "time"
+
+       googleUUID "github.com/google/uuid"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+
+       v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/index"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+       "github.com/apache/skywalking-banyandb/banyand/query"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/series/trace"
+       "github.com/apache/skywalking-banyandb/banyand/storage"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+type testData struct {
+       certFile           string
+       serverHostOverride string
+       TLS                bool
+       addr               string
+}
+
+func setup(tester *require.Assertions) (*grpc.Server, *grpc.Server, func()) {
+       tester.NoError(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "warn",
+       }))
+       // Init `Discovery` module
+       repo, err := discovery.NewServiceRepo(context.Background())
+       tester.NoError(err)
+       tester.NotNil(repo)
+       // Init `Queue` module
+       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       tester.NoError(err)
+       // Init `Database` module
+       db, err := storage.NewDB(context.TODO(), repo)
+       tester.NoError(err)
+       uuid, err := googleUUID.NewUUID()
+       tester.NoError(err)
+       rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+       tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
+       // Init `Index` module
+       indexSvc, err := index.NewService(context.TODO(), repo)
+       tester.NoError(err)
+       // Init `Trace` module
+       traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, 
pipeline)
+       tester.NoError(err)
+       // Init `Query` module
+       executor, err := query.NewExecutor(context.TODO(), repo, indexSvc, 
traceSvc, traceSvc)
+       tester.NoError(err)
+       // Init `liaison` module
+       tcp := grpc.NewServer(context.TODO(), pipeline, repo)
+       tester.NoError(tcp.FlagSet().Parse([]string{"--tlsVal=false", 
"--addr=:17912"}))
+       tcpTLS := grpc.NewServer(context.TODO(), pipeline, repo)
+       tester.NoError(tcpTLS.FlagSet().Parse([]string{"--tlsVal=true", 
"--addr=:17913"}))
+
+       err = indexSvc.PreRun()
+       tester.NoError(err)
+
+       err = traceSvc.PreRun()
+       tester.NoError(err)
+
+       err = db.PreRun()
+       tester.NoError(err)
+
+       err = executor.PreRun()
+       tester.NoError(err)
+
+       err = tcp.PreRun()
+       tester.NoError(err)
+       tester.NoError(tcpTLS.PreRun())
+
+       go func() {
+               tester.NoError(traceSvc.Serve())
+       }()
+
+       go func() {
+               tester.NoError(tcpTLS.Serve())
+       }()
+
+       go func() {
+               tester.NoError(tcp.Serve())
+       }()
+
+       ctx, cancelFunc := context.WithTimeout(context.Background(), 
10*time.Second)
+       defer cancelFunc()
+
+       tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
+       return tcp, tcpTLS, func() {
+               db.GracefulStop()
+               _ = os.RemoveAll(rootPath)
+       }
+}
+
+func TestTraceService(t *testing.T) {
+       tester := require.New(t)
+       tcp, tcpTLS, gracefulStop := setup(tester)
+       defer gracefulStop()
+       _, currentFile, _, _ := runtime.Caller(0)
+       basePath := filepath.Dir(currentFile)
+       certFile := filepath.Join(basePath, "data/server_cert.pem")
+       testCases := []struct {
+               name    string
+               args    testData
+               wantErr bool
+       }{
+               {
+                       name: "isTLS",
+                       args: testData{
+                               TLS:                true,
+                               certFile:           certFile,
+                               serverHostOverride: "localhost",
+                               addr:               "localhost:17913",
+                       },
+               },
+               {
+                       name: "noTLS",
+                       args: testData{
+                               TLS:  false,
+                               addr: "localhost:17912",
+                       },
+               },
+       }
+       for _, tc := range testCases {
+               if tc.args.TLS {
+                       errValidate := tcpTLS.Validate()
+                       assert.NoError(t, errValidate)
+                       var opts []grpclib.DialOption
+                       creds, err := 
credentials.NewClientTLSFromFile(tc.args.certFile, tc.args.serverHostOverride)
+                       assert.NoError(t, err)
+                       opts = append(opts, 
grpclib.WithTransportCredentials(creds))
+                       linkService(t, tc.args.addr, opts)
+               } else {
+                       errValidate := tcp.Validate()
+                       assert.NoError(t, errValidate)
+                       var opts []grpclib.DialOption
+                       opts = append(opts, grpclib.WithInsecure())
+                       linkService(t, tc.args.addr, opts)
+               }
+       }
+}
+
+func linkService(t *testing.T, addr string, opts []grpclib.DialOption) {
+       conn, err := grpclib.Dial(addr, opts...)
+       assert.NoError(t, err)
+       defer conn.Close()
+       traceWrite(t, conn)
+       traceQuery(t, conn)
+}
+
+func traceWrite(t *testing.T, conn *grpclib.ClientConn) {
+       client := v1.NewTraceServiceClient(conn)
+       ctx := context.Background()
+       entityValue := pb.NewEntityValueBuilder().
+               EntityID("entityId").
+               DataBinary([]byte{12}).
+               Fields("trace_id-xxfff.111323",
+                       0,
+                       "webapp_id",
+                       "10.0.0.1_id",
+                       "/home_id",
+                       "webapp",
+                       "10.0.0.1",
+                       "/home",
+                       300,
+                       1622933202000000000).
+               Timestamp(time.Now()).
+               Build()
+       criteria := pb.NewWriteEntityBuilder().
+               EntityValue(entityValue).
+               Metadata("default", "sw").
+               Build()
+       stream, errorWrite := client.Write(ctx)
+       if errorWrite != nil {
+               t.Errorf("%v.Write(_) = _, %v", client, errorWrite)
+       }
+       waitc := make(chan struct{})
+       go func() {
+               for {
+                       writeResponse, errRecv := stream.Recv()
+                       if errRecv == io.EOF {
+                               // read done.
+                               close(waitc)
+                               return
+                       }
+                       assert.NoError(t, errRecv)
+                       assert.NotNil(t, writeResponse)
+               }
+       }()
+       if errSend := stream.Send(criteria); errSend != nil {
+               t.Errorf("Failed to send a note: %v", errSend)
+       }
+       if errorSend := stream.CloseSend(); errorSend != nil {
+               t.Errorf("Failed to send a note: %v", errorSend)
+       }
+       <-waitc
+}
+
+func traceQuery(t *testing.T, conn *grpclib.ClientConn) {
+       client := v1.NewTraceServiceClient(conn)
+       ctx := context.Background()
+       sT, eT := time.Now().Add(-3*time.Hour), time.Now()
+       criteria := pb.NewQueryRequestBuilder().
+               Limit(5).
+               Offset(10).
+               OrderBy("service_instance_id", v1.QueryOrder_SORT_DESC).
+               Metadata("default", "trace").
+               Projection("http.method", "service_id", "service_instance_id").
+               Fields("service_id", "=", "my_app", "http.method", "=", "GET").
+               TimeRange(sT, eT).
+               Build()
+       stream, errRev := client.Query(ctx, criteria)
+       if errRev != nil {
+               t.Errorf("Retrieve client failed: %v", errRev)
+       }
+       assert.NotNil(t, stream)
+}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index dbea2ae..f3da0a2 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/index"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/series"
        "github.com/apache/skywalking-banyandb/banyand/series/trace"
        "github.com/apache/skywalking-banyandb/banyand/storage"
@@ -62,6 +63,9 @@ func setupServices(t *testing.T, tester *require.Assertions) 
(discovery.ServiceR
        repo, err := discovery.NewServiceRepo(context.Background())
        tester.NoError(err)
        tester.NotNil(repo)
+       // Init `Queue` module
+       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       tester.NoError(err)
 
        // Init `Index` module
        indexSvc, err := index.NewService(context.TODO(), repo)
@@ -76,7 +80,7 @@ func setupServices(t *testing.T, tester *require.Assertions) 
(discovery.ServiceR
        tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
 
        // Init `Trace` module
-       traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc)
+       traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, 
pipeline)
        tester.NoError(err)
 
        // Init `Query` module
@@ -84,7 +88,7 @@ func setupServices(t *testing.T, tester *require.Assertions) 
(discovery.ServiceR
        tester.NoError(err)
 
        // Init `Liaison` module
-       liaison := grpc.NewServer(context.TODO(), nil, repo)
+       liaison := grpc.NewServer(context.TODO(), pipeline, repo)
 
        // :PreRun:
        // 1) TraceSeries,
diff --git a/banyand/series/trace/common_test.go 
b/banyand/series/trace/common_test.go
index f8b40ae..8d13a7c 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -68,7 +68,7 @@ func setup(t *testing.T) (*traceSeries, func()) {
        ctrl := gomock.NewController(t)
        mockIndex := index.NewMockService(ctrl)
        mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), 
gomock.Any()).AnyTimes()
-       svc, err := NewService(context.TODO(), db, nil, mockIndex)
+       svc, err := NewService(context.TODO(), db, nil, mockIndex, nil)
        assert.NoError(t, err)
        assert.NoError(t, svc.PreRun())
        assert.NoError(t, db.PreRun())
@@ -257,7 +257,7 @@ func setupTestData(t *testing.T, ts *traceSeries, 
seriesEntities []seriesEntity)
                        Timestamp(se.entity.t).
                        Fields(se.entity.items...).
                        Build()
-               shardID := partition.ShardID(seriesID, 2)
+               shardID, _ := partition.ShardID(seriesID, 2)
                got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), 
shardID, data.EntityValue{
                        EntityValue: ev,
                })
diff --git a/banyand/series/trace/query.go b/banyand/series/trace/query.go
index 64985f3..770c877 100644
--- a/banyand/series/trace/query.go
+++ b/banyand/series/trace/query.go
@@ -42,7 +42,10 @@ func (t *traceSeries) FetchTrace(traceID string, opt 
series.ScanOptions) (trace
                return trace, ErrInvalidTraceID
        }
        traceIDBytes := []byte(traceID)
-       traceIDShardID := partition.ShardID(traceIDBytes, t.shardNum)
+       traceIDShardID, shardIDError := partition.ShardID(traceIDBytes, 
t.shardNum)
+       if shardIDError != nil {
+               return trace, shardIDError
+       }
        bb, errTraceID := t.reader.TimeSeriesReader(traceIDShardID, traceIndex, 
0, 0).GetAll(traceIDBytes)
        if errTraceID != nil {
                return trace, errTraceID
diff --git a/banyand/series/trace/service.go b/banyand/series/trace/service.go
index 1e77d4b..d646447 100644
--- a/banyand/series/trace/service.go
+++ b/banyand/series/trace/service.go
@@ -23,10 +23,13 @@ import (
 
        "google.golang.org/protobuf/types/known/timestamppb"
 
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
        "github.com/apache/skywalking-banyandb/api/event"
        v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/index"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/series"
        "github.com/apache/skywalking-banyandb/banyand/series/schema"
        "github.com/apache/skywalking-banyandb/banyand/storage"
@@ -38,20 +41,24 @@ import (
 var _ series.Service = (*service)(nil)
 
 type service struct {
-       db        storage.Database
-       schemaMap map[string]*traceSeries
-       l         *logger.Logger
-       repo      discovery.ServiceRepo
-       stopCh    chan struct{}
-       idx       index.Service
+       db            storage.Database
+       schemaMap     map[string]*traceSeries
+       l             *logger.Logger
+       repo          discovery.ServiceRepo
+       stopCh        chan struct{}
+       idx           index.Service
+       writeListener *writeCallback
+       pipeline      queue.Queue
 }
 
 //NewService returns a new service
-func NewService(_ context.Context, db storage.Database, repo 
discovery.ServiceRepo, idx index.Service) (series.Service, error) {
+func NewService(_ context.Context, db storage.Database, repo 
discovery.ServiceRepo, idx index.Service, pipeline queue.Queue) 
(series.Service, error) {
        return &service{
-               db:   db,
-               repo: repo,
-               idx:  idx,
+               db:            db,
+               repo:          repo,
+               idx:           idx,
+               pipeline:      pipeline,
+               writeListener: &writeCallback{},
        }, nil
 }
 
@@ -65,7 +72,9 @@ func (s *service) PreRun() error {
                return err
        }
        s.schemaMap = make(map[string]*traceSeries, len(schemas))
+       s.writeListener.schemaMap = s.schemaMap
        s.l = logger.GetLogger(s.Name())
+       s.writeListener.l = s.l
        for _, sa := range schemas {
                ts, errTS := newTraceSeries(sa, s.l, s.idx)
                if errTS != nil {
@@ -74,6 +83,7 @@ func (s *service) PreRun() error {
                s.db.Register(ts)
                id := formatTraceSeriesID(ts.name, ts.group)
                s.schemaMap[id] = ts
+               s.writeListener.schemaMap[id] = ts
                s.l.Info().Str("id", id).Msg("initialize Trace series")
        }
        return err
@@ -136,6 +146,10 @@ func (s *service) Serve() error {
                        return errPublishRules
                }
        }
+       errWrite := s.pipeline.Subscribe(data.TopicWriteEvent, s.writeListener)
+       if errWrite != nil {
+               return errWrite
+       }
        s.stopCh = make(chan struct{})
        <-s.stopCh
        return nil
@@ -146,3 +160,26 @@ func (s *service) GracefulStop() {
                close(s.stopCh)
        }
 }
+
+type writeCallback struct {
+       l         *logger.Logger
+       schemaMap map[string]*traceSeries
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+       writeEvent, ok := message.Data().(data.TraceWriteDate)
+       if !ok {
+               w.l.Warn().Msg("invalid event data type")
+               return
+       }
+       entityValue := writeEvent.WriteRequest.GetEntity()
+       ts := writeEvent.WriteRequest.GetMetadata()
+       id := formatTraceSeriesID(ts.GetName(), ts.GetGroup())
+       _, err := w.schemaMap[id].Write(common.SeriesID(writeEvent.SeriesID), 
writeEvent.ShardID, data.EntityValue{
+               EntityValue: entityValue,
+       })
+       if err != nil {
+               w.l.Warn().Err(err)
+       }
+       return
+}
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 1c5f639..96af476 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -120,8 +120,10 @@ func (s *service) Write(traceSeriesMetadata 
common.Metadata, ts time.Time, serie
                Build()
 
        seriesIDBytes := []byte(seriesID)
-       shardID := partition.ShardID(seriesIDBytes, traceSeries.shardNum)
-
+       shardID, shardIDError := partition.ShardID(seriesIDBytes, 
traceSeries.shardNum)
+       if shardIDError != nil {
+               return err == nil, shardIDError
+       }
        _, err = 
traceSeries.Write(common.SeriesID(convert.Hash(seriesIDBytes)), shardID, 
data.EntityValue{
                EntityValue: ev,
        })
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 115ba15..1b362d1 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -75,7 +75,10 @@ func (t *traceSeries) Write(seriesID common.SeriesID, 
shardID uint, entity data.
        if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes, 
bydb_bytes.Join(stateBytes, seriesIDBytes, wallTsBytes)); err != nil {
                return 0, errors.Wrap(err, "failed to write chunkID index")
        }
-       traceIDShardID := partition.ShardID(traceID, t.shardNum)
+       traceIDShardID, shardIDError := partition.ShardID(traceID, t.shardNum)
+       if shardIDError != nil {
+               return 0, shardIDError
+       }
        if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).
                Put(traceID, 
bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), 
entityTs); err != nil {
                return 0, errors.Wrap(err, "failed to Trace index")
diff --git a/banyand/series/trace/write_test.go 
b/banyand/series/trace/write_test.go
index 0ca51d8..a671cb2 100644
--- a/banyand/series/trace/write_test.go
+++ b/banyand/series/trace/write_test.go
@@ -180,13 +180,17 @@ func Test_traceSeries_Write(t *testing.T) {
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        seriesID := []byte(tt.args.seriesID)
+                       shardID, shardIDError := partition.ShardID(seriesID, 2)
+                       if shardIDError != nil {
+                               return
+                       }
                        ev := pb.NewEntityValueBuilder().
                                DataBinary(tt.args.entity.binary).
                                EntityID(tt.args.entity.id).
                                Fields(tt.args.entity.items...).
                                Timestamp(time.Now()).
                                Build()
-                       got, err := 
ts.Write(common.SeriesID(convert.Hash(seriesID)), partition.ShardID(seriesID, 
2), data.EntityValue{
+                       got, err := 
ts.Write(common.SeriesID(convert.Hash(seriesID)), shardID, data.EntityValue{
                                EntityValue: ev,
                        })
                        if (err != nil) != tt.wantErr {
diff --git a/pkg/partition/route.go b/pkg/partition/route.go
index 9b4d93c..bcb18af 100644
--- a/pkg/partition/route.go
+++ b/pkg/partition/route.go
@@ -17,9 +17,16 @@
 
 package partition
 
-import "github.com/apache/skywalking-banyandb/pkg/convert"
+import (
+       "github.com/pkg/errors"
 
-func ShardID(key []byte, shardNum uint32) uint {
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func ShardID(key []byte, shardNum uint32) (uint, error) {
+       if shardNum < 1 {
+               return 0, errors.New("invalid shardNum")
+       }
        encodeKey := convert.Hash(key)
-       return uint(encodeKey % uint64(shardNum))
+       return uint(encodeKey % uint64(shardNum)), nil
 }
diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go
index 03898da..caf0e92 100644
--- a/pkg/query/logical/expr.go
+++ b/pkg/query/logical/expr.go
@@ -44,21 +44,21 @@ type FieldRef struct {
        // name defines the key of the field
        name string
        // spec contains the index of the key in the schema, as well as the 
underlying FieldSpec
-       spec *fieldSpec
+       Spec *fieldSpec
 }
 
 func (f *FieldRef) Equal(expr Expr) bool {
        if other, ok := expr.(*FieldRef); ok {
-               return other.name == f.name && other.spec.spec.GetType() == 
f.spec.spec.GetType()
+               return other.name == f.name && other.Spec.spec.GetType() == 
f.Spec.spec.GetType()
        }
        return false
 }
 
 func (f *FieldRef) FieldType() apiv1.FieldSpec_FieldType {
-       if f.spec == nil {
+       if f.Spec == nil {
                panic("should be resolved first")
        }
-       return f.spec.spec.GetType()
+       return f.Spec.spec.GetType()
 }
 
 func (f *FieldRef) Resolve(s Schema) error {
@@ -66,12 +66,12 @@ func (f *FieldRef) Resolve(s Schema) error {
        if err != nil {
                return err
        }
-       f.spec = specs[0].spec
+       f.Spec = specs[0].Spec
        return nil
 }
 
 func (f *FieldRef) String() string {
-       return fmt.Sprintf("#%s<%s>", f.name, f.spec.spec.GetType().String())
+       return fmt.Sprintf("#%s<%s>", f.name, f.Spec.spec.GetType().String())
 }
 
 func NewFieldRef(fieldName string) *FieldRef {
diff --git a/pkg/query/logical/plan_orderby.go 
b/pkg/query/logical/plan_orderby.go
index 0c087d4..92cb50d 100644
--- a/pkg/query/logical/plan_orderby.go
+++ b/pkg/query/logical/plan_orderby.go
@@ -77,7 +77,7 @@ func (o *orderBy) Execute(ec executor.ExecutionContext) 
([]data.Entity, error) {
                return entities, nil
        }
 
-       sort.Slice(entities, sortMethod(entities, o.targetRef.spec.idx, o.sort))
+       sort.Slice(entities, sortMethod(entities, o.targetRef.Spec.Idx, o.sort))
 
        return entities, nil
 }
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 00542a5..6f33f3a 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -27,6 +27,7 @@ import (
 
 type Schema interface {
        IndexDefined(string) (bool, *apiv1.IndexObject)
+       FieldSubscript(string) (bool, int)
        FieldDefined(string) bool
        CreateRef(names ...string) ([]*FieldRef, error)
        Map(refs ...*FieldRef) Schema
@@ -37,12 +38,12 @@ type Schema interface {
 }
 
 type fieldSpec struct {
-       idx  int
+       Idx  int
        spec *apiv1.FieldSpec
 }
 
 func (fs *fieldSpec) Equal(other *fieldSpec) bool {
-       return fs.idx == other.idx && fs.spec.GetType() == other.spec.GetType() 
&& fs.spec.GetName() == other.spec.GetName()
+       return fs.Idx == other.Idx && fs.spec.GetType() == other.spec.GetType() 
&& fs.spec.GetName() == other.spec.GetName()
 }
 
 var _ Schema = (*schema)(nil)
@@ -74,6 +75,18 @@ func (s *schema) IndexDefined(field string) (bool, 
*apiv1.IndexObject) {
        return false, nil
 }
 
+func (s *schema) FieldSubscript(field string) (bool, int) {
+       idxRule := s.indexRule.Spec
+       for i, indexObj := range idxRule.GetObjects() {
+               for _, fieldName := range indexObj.GetFields() {
+                       if field == fieldName {
+                               return true, i
+                       }
+               }
+       }
+       return false, -1
+}
+
 func (s *schema) Equal(s2 Schema) bool {
        if other, ok := s2.(*schema); ok {
                return cmp.Equal(other.fieldMap, s.fieldMap)
@@ -83,7 +96,7 @@ func (s *schema) Equal(s2 Schema) bool {
 
 func (s *schema) RegisterField(name string, i int, spec *apiv1.FieldSpec) {
        s.fieldMap[name] = &fieldSpec{
-               idx:  i,
+               Idx:  i,
                spec: spec,
        }
 }
@@ -117,7 +130,7 @@ func (s *schema) Map(refs ...*FieldRef) Schema {
                fieldMap:    make(map[string]*fieldSpec),
        }
        for _, ref := range refs {
-               newS.fieldMap[ref.name] = ref.spec
+               newS.fieldMap[ref.name] = ref.Spec
        }
        return newS
 }

Reply via email to