This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch feature-triple
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/feature-triple by this push:
     new 06e730246 feat:sort triple logic (#2483)
06e730246 is described below

commit 06e73024649503456b3cd2a2b626f6c6fc804942
Author: finalt <finalt1...@163.com>
AuthorDate: Sat Nov 4 19:50:49 2023 +0800

    feat:sort triple logic (#2483)
---
 protocol/triple/server.go | 68 ++++++++++++++++++++++++++++++++++-------------
 protocol/triple/triple.go |  1 +
 server/server.go          | 21 ++++++++++++---
 3 files changed, 68 insertions(+), 22 deletions(-)

diff --git a/protocol/triple/server.go b/protocol/triple/server.go
index 79465df2d..cc0aba2d2 100644
--- a/protocol/triple/server.go
+++ b/protocol/triple/server.go
@@ -49,11 +49,14 @@ import (
 // Server is TRIPLE server
 type Server struct {
        httpServer *http.Server
+       handler    *http.ServeMux
 }
 
 // NewServer creates a new TRIPLE server
 func NewServer() *Server {
-       return &Server{}
+       return &Server{
+               handler: http.NewServeMux(),
+       }
 }
 
 // Start TRIPLE server
@@ -69,19 +72,6 @@ func (s *Server) Start(invoker protocol.Invoker, info 
*server.ServiceInfo) {
        srv := &http.Server{
                Addr: addr,
        }
-
-       maxServerRecvMsgSize := constant.DefaultMaxServerRecvMsgSize
-       if recvMsgSize, convertErr := 
humanize.ParseBytes(URL.GetParam(constant.MaxServerRecvMsgSize, "")); 
convertErr == nil && recvMsgSize != 0 {
-               maxServerRecvMsgSize = int(recvMsgSize)
-       }
-       hanOpts = append(hanOpts, tri.WithReadMaxBytes(maxServerRecvMsgSize))
-
-       maxServerSendMsgSize := constant.DefaultMaxServerSendMsgSize
-       if sendMsgSize, convertErr := 
humanize.ParseBytes(URL.GetParam(constant.MaxServerSendMsgSize, "")); err == 
convertErr && sendMsgSize != 0 {
-               maxServerSendMsgSize = int(sendMsgSize)
-       }
-       hanOpts = append(hanOpts, tri.WithSendMaxBytes(maxServerSendMsgSize))
-
        serialization := URL.GetParam(constant.SerializationKey, 
constant.ProtobufSerialization)
        switch serialization {
        case constant.ProtobufSerialization:
@@ -89,7 +79,6 @@ func (s *Server) Start(invoker protocol.Invoker, info 
*server.ServiceInfo) {
        default:
                panic(fmt.Sprintf("Unsupported serialization: %s", 
serialization))
        }
-
        // todo: implement interceptor
        // If global trace instance was set, then server tracer instance
        // can be get. If not, will return NoopTracer.
@@ -116,14 +105,13 @@ func (s *Server) Start(invoker protocol.Invoker, info 
*server.ServiceInfo) {
        //      logger.Infof("Triple Server initialized the TLSConfig 
configuration")
        //}
        //srv.TLSConfig = cfg
-
-       // todo:// open tracing
-       hanOpts = append(hanOpts, tri.WithInterceptors())
        // todo:// move tls config to handleService
+
+       hanOpts = getHanOpts(URL)
        s.httpServer = srv
 
        go func() {
-               mux := http.NewServeMux()
+               mux := s.handler
                if info != nil {
                        handleServiceWithInfo(invoker, info, mux, hanOpts...)
                } else {
@@ -144,6 +132,48 @@ func (s *Server) Start(invoker protocol.Invoker, info 
*server.ServiceInfo) {
        }()
 }
 
+// RefreshService refreshes Triple Service
+func (s *Server) RefreshService(invoker protocol.Invoker, info 
*server.ServiceInfo) {
+       var (
+               URL     *common.URL
+               hanOpts []tri.HandlerOption
+       )
+       URL = invoker.GetURL()
+       serialization := URL.GetParam(constant.SerializationKey, 
constant.ProtobufSerialization)
+       switch serialization {
+       case constant.ProtobufSerialization:
+       case constant.JSONSerialization:
+       default:
+               panic(fmt.Sprintf("Unsupported serialization: %s", 
serialization))
+       }
+       hanOpts = getHanOpts(URL)
+       mux := s.handler
+       if info != nil {
+               handleServiceWithInfo(invoker, info, mux, hanOpts...)
+       } else {
+               compatHandleService(mux)
+       }
+}
+
+func getHanOpts(url *common.URL) (hanOpts []tri.HandlerOption) {
+       var err error
+       maxServerRecvMsgSize := constant.DefaultMaxServerRecvMsgSize
+       if recvMsgSize, convertErr := 
humanize.ParseBytes(url.GetParam(constant.MaxServerRecvMsgSize, "")); 
convertErr == nil && recvMsgSize != 0 {
+               maxServerRecvMsgSize = int(recvMsgSize)
+       }
+       hanOpts = append(hanOpts, tri.WithReadMaxBytes(maxServerRecvMsgSize))
+
+       maxServerSendMsgSize := constant.DefaultMaxServerSendMsgSize
+       if sendMsgSize, convertErr := 
humanize.ParseBytes(url.GetParam(constant.MaxServerSendMsgSize, "")); err == 
convertErr && sendMsgSize != 0 {
+               maxServerSendMsgSize = int(sendMsgSize)
+       }
+       hanOpts = append(hanOpts, tri.WithSendMaxBytes(maxServerSendMsgSize))
+
+       // todo:// open tracing
+       hanOpts = append(hanOpts, tri.WithInterceptors())
+       return hanOpts
+}
+
 // getSyncMapLen gets sync map len
 func getSyncMapLen(m *sync.Map) int {
        length := 0
diff --git a/protocol/triple/triple.go b/protocol/triple/triple.go
index cb64aa426..f5d523a1b 100644
--- a/protocol/triple/triple.go
+++ b/protocol/triple/triple.go
@@ -88,6 +88,7 @@ func (tp *TripleProtocol) openServer(invoker 
protocol.Invoker, info *server.Serv
        defer tp.serverLock.Unlock()
 
        if _, ok := tp.serverMap[url.Location]; ok {
+               tp.serverMap[url.Location].RefreshService(invoker, info)
                return
        }
 
diff --git a/server/server.go b/server/server.go
index 2786cb66e..e8add11f6 100644
--- a/server/server.go
+++ b/server/server.go
@@ -20,6 +20,7 @@ package server
 import (
        "context"
        "fmt"
+       "sync"
 )
 
 import (
@@ -38,6 +39,8 @@ type Server struct {
        info    *ServiceInfo
 
        cfg *ServerOptions
+
+       svcOptsMap sync.Map
 }
 
 // ServiceInfo is meta info of a service
@@ -145,14 +148,26 @@ func (s *Server) Register(handler interface{}, info 
*ServiceInfo, opts ...Servic
                return err
        }
        newSvcOpts.Implement(handler)
-       if err := newSvcOpts.ExportWithInfo(info); err != nil {
-               return err
-       }
+       s.svcOptsMap.Store(newSvcOpts, info)
 
        return nil
 }
 
+func (s *Server) exportServices() (err error) {
+       s.svcOptsMap.Range(func(newSvcOpts, info interface{}) bool {
+               err = 
newSvcOpts.(*ServiceOptions).ExportWithInfo(info.(*ServiceInfo))
+               if err != nil {
+                       return false
+               }
+               return true
+       })
+       return err
+}
+
 func (s *Server) Serve() error {
+       if err := s.exportServices(); err != nil {
+               return err
+       }
        metadata.ExportMetadataService()
        registry_exposed.RegisterServiceInstance(s.cfg.Application.Name, 
s.cfg.Application.Tag, s.cfg.Application.MetadataType)
        select {}

Reply via email to