hanahmily commented on a change in pull request #30:
URL: https://github.com/apache/skywalking-banyandb/pull/30#discussion_r685573347



##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -102,12 +135,50 @@ func (s *Server) Name() string {
 }
 
 func (s *Server) FlagSet() *run.FlagSet {
+       size := 1024 * 1024 * 8
+       _, currentFile, _, _ := runtime.Caller(0)
+       basePath := filepath.Dir(currentFile)
+       serverCert := filepath.Join(basePath, "testdata/server_cert.pem")
+       serverKey := filepath.Join(basePath, "testdata/server_key.pem")
+
        fs := run.NewFlagSet("grpc")
-       fs.StringVarP(&s.addr, "addr", "", ":17912", "the address of banyand 
listens")
+       fs.Int("maxRecvMsgSize", size, "The size of max receiving message")

Review comment:
       Pls follow `addr` to  set up flags:
   
   ```
   fs.StringVarP(&s.addr, "addr", "", ":17912", "The address of banyand 
listens")
   ```

##########
File path: banyand/liaison/grpc/testdata/server_cert.pem
##########
@@ -0,0 +1,32 @@
+-----BEGIN CERTIFICATE-----

Review comment:
       As @kezhenxu94 mentioned, if you copy 
https://github.com/grpc/grpc-go/blob/master/examples/data/x509/server_cert.pem, 
pls add notices and license in banyandb. But they look like only for the test 
purpose, how about generate them by your hand 

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -52,6 +77,9 @@ func (s *shardInfo) Rev(message bus.Message) (resp 
bus.Message) {
                s.log.Warn().Msg("invalid event data type")
                return
        }
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       s.shardEvent = shardEvent
        s.log.Info().

Review comment:
        * Add two separate `mutex` to protect `shardEvent` and `seriesEvent`. 
They don't want to block each other.
    * Pls parse the `action` of `shardEvent` and `seriesEvent` instead of 
accepting it directly.
   

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -128,3 +210,92 @@ func (s *Server) GracefulStop() {
        s.log.Info().Msg("stopping")
        s.ser.GracefulStop()
 }
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID 
uint64) data.TraceWriteDate {
+       return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, 
WriteRequest: writeEntity}
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+       for {
+               writeEntity, err := TraceWriteServer.Recv()
+               if err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       return err
+               }
+
+               ana := logical.DefaultAnalyzer()
+               metadata := common.Metadata{
+                       KindVersion: apischema.SeriesKindVersion,
+                       Spec:        writeEntity.GetMetadata(),
+               }
+               schema, ruleError := ana.BuildTraceSchema(context.TODO(), 
metadata)
+               if ruleError != nil {
+                       return ruleError
+               }
+               s.seriesInfo.RWMutex.RLock()
+               if s.seriesInfo.seriesEvent == nil {
+                       s.seriesInfo.RWMutex.RUnlock()

Review comment:
       How about adding a `s.getSeriesInfo` here like 
   
   ```go
   func (s *Server) getSeriesInfo() {
       s.RWMutex.RLock()
       defer s.RWMutext.RUlock()
       return s.xxxx
   }
   ```
   
   The above method helps us get rid of checking where to unlock rwmutext.

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -102,12 +135,50 @@ func (s *Server) Name() string {
 }
 
 func (s *Server) FlagSet() *run.FlagSet {
+       size := 1024 * 1024 * 8
+       _, currentFile, _, _ := runtime.Caller(0)
+       basePath := filepath.Dir(currentFile)
+       serverCert := filepath.Join(basePath, "testdata/server_cert.pem")
+       serverKey := filepath.Join(basePath, "testdata/server_key.pem")
+
        fs := run.NewFlagSet("grpc")
-       fs.StringVarP(&s.addr, "addr", "", ":17912", "the address of banyand 
listens")
+       fs.Int("maxRecvMsgSize", size, "The size of max receiving message")
+       fs.Bool("tls", true, "Connection uses TLS if true, else plain TCP")
+       fs.String("certFile", serverCert, "The TLS cert file")
+       fs.String("keyFile", serverKey, "The TLS key file")
+       fs.String("serverHostOverride", "x.test.example.com", "The server name 
used to verify the hostname returned by the TLS handshake")
+       fs.StringVarP(&s.addr, "addr", "", ":17912", "The address of banyand 
listens")
+
        return fs
 }
 
 func (s *Server) Validate() error {
+       _, err := s.FlagSet().GetInt("maxRecvMsgSize")

Review comment:
       You could NOT access s.FlagSet() which will be called by module 
lifecycle framework.
   
   Let's use `addr` to show how to validate flags:
   
   ```go
   func (s *Server) Validate() error {
         if s.addr == "" {
               return errNoAddr
         }
   }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to