Copilot commented on code in PR #688:
URL: https://github.com/apache/dubbo-go-pixiu/pull/688#discussion_r2173630996


##########
pkg/listener/grpc/grpc_listener.go:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+       "context"
+       "crypto/tls"
+       "net"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/pkg/errors"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/keepalive"
+       "google.golang.org/grpc/reflection"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/codec/grpc/passthrough"
+       "github.com/apache/dubbo-go-pixiu/pkg/config"
+       "github.com/apache/dubbo-go-pixiu/pkg/filterchain"
+       "github.com/apache/dubbo-go-pixiu/pkg/listener"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+// Register the gRPC listener service factory
+func init() {
+       listener.SetListenerServiceFactory(model.ProtocolTypeGRPC, 
newGrpcListenerService)
+}
+
+// Constants for gRPC listener
+const (
+       defaultTLSTimeout      = 20 * time.Second
+       defaultGracePeriod     = 5 * time.Second
+       defaultMinKeepalive    = 30 * time.Second
+       defaultStartupWait     = 100 * time.Millisecond
+       defaultShutdownTimeout = 5 * time.Second
+)
+
+// GrpcListenerService implements the ListenerService interface for gRPC
+type GrpcListenerService struct {
+       listener.BaseListenerService
+       server          *grpc.Server
+       listener        net.Listener
+       gShutdownConfig *listener.ListenerGracefulShutdownConfig
+       closeOnce       sync.Once
+}
+
+// newGrpcListenerService creates a new gRPC listener service
+func newGrpcListenerService(lc *model.Listener, bs *model.Bootstrap) 
(listener.ListenerService, error) {
+       // Create network filter chain
+       fc := filterchain.CreateNetworkFilterChain(lc.FilterChain)
+
+       // Initialize service with base configuration
+       ls := &GrpcListenerService{
+               BaseListenerService: listener.BaseListenerService{
+                       Config:      lc,
+                       FilterChain: fc,
+               },
+               gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
+       }
+
+       // Parse gRPC specific configuration
+       grpcConfig := model.MapInGrpcStruct(lc.Config)
+
+       // Build server options with a proxy handler for unknown services
+       opts := buildGrpcServerOptions(grpcConfig, ls)
+
+       // Create and configure gRPC server
+       server := grpc.NewServer(opts...)
+       registerProxyServices(server)
+       ls.server = server
+
+       ls.logConfiguration()
+
+       return ls, nil
+}
+
+// Start initializes and starts the gRPC server
+func (ls *GrpcListenerService) Start() error {
+       address := ls.Config.Address.SocketAddress.GetAddress()
+
+       // Create network listener
+       listener, err := net.Listen("tcp", address)
+       if err != nil {
+               return errors.Wrapf(err, "failed to listen on %s", address)
+       }
+       ls.listener = listener
+
+       ls.logConfiguration()
+
+       // Start server in a goroutine
+       go ls.serveGrpc(listener)
+
+       // The listener is ready as soon as net.Listen succeeds and the 
goroutine is running.
+       // We don't need a fixed sleep here as it's unreliable.
+       logger.Infof("gRPC listener successfully started at %s", address)
+
+       return nil
+}
+
+// serveGrpc runs the gRPC server on the provided listener
+func (ls *GrpcListenerService) serveGrpc(listener net.Listener) {
+       logger.Info("gRPC server starting to serve...")
+       if err := ls.server.Serve(listener); err != nil {
+               logger.Errorf("gRPC server serve error: %v", err)
+       } else {
+               logger.Info("gRPC server stopped gracefully")
+       }
+}
+
+// proxyStreamHandler handles all unknown gRPC streams and forwards them 
through the filter chain.
+// This is the core of the gRPC proxy functionality.
+func (ls *GrpcListenerService) proxyStreamHandler(srv any, ss 
grpc.ServerStream) error {
+       start := time.Now()
+
+       // The full method name is available in the stream's context.
+       fullMethod, ok := grpc.MethodFromServerStream(ss)
+       if !ok {
+               return errors.New("could not determine method from stream")
+       }
+
+       // This log is a bit too verbose, as the filter chain will provide more 
detailed logs.
+       // logger.Debugf("gRPC proxy stream request: %s", fullMethod)
+
+       // Check if server is shutting down
+       if ls.gShutdownConfig.RejectRequest {
+               logger.Warnf("Rejecting gRPC stream request %s during 
shutdown", fullMethod)
+               return errors.New("server is shutting down")
+       }
+
+       // Track active request count
+       ls.gShutdownConfig.ActiveCount++
+       defer func() {
+               ls.gShutdownConfig.ActiveCount--
+       }()
+
+       // Since we don't have StreamInfo here, we must rely on the filter 
chain to get it if needed.
+       // For a pure proxy, we just need to forward the stream.
+       stream := &RPCStreamImpl{ServerStream: ss}
+
+       // The filter chain needs RPCStreamInfo. Let's create a basic one.
+       // We can't know IsClientStream/IsServerStream without parsing the 
descriptor,
+       // but the grpc-proxy filter doesn't rely on it. It re-infers this.
+       // We pass the full method name which is the most critical piece of 
information.
+       streamInfo := &model.RPCStreamInfo{
+               FullMethod: fullMethod,
+       }
+
+       // Process stream through filter chain
+       err := ls.FilterChain.OnStreamRPC(stream, streamInfo)
+
+       // Log request completion
+       duration := time.Since(start)
+       if err != nil {
+               logger.Errorf("gRPC stream request %s failed: %v (took %v)", 
fullMethod, err, duration)
+       } else {
+               logger.Debugf("gRPC stream for %s completed in %v", fullMethod, 
duration)
+       }
+
+       return err
+}
+
+// logConfiguration logs the current gRPC server configuration
+func (ls *GrpcListenerService) logConfiguration() {
+       if grpcConfig, ok := ls.Config.Config.(model.GrpcConfig); ok {

Review Comment:
   The configuration is returned as a pointer (from MapInGrpcStruct), so the 
type assertion should check for *model.GrpcConfig instead of model.GrpcConfig 
to ensure the logConfiguration code block executes as intended.
   ```suggestion
        if grpcConfig, ok := ls.Config.Config.(*model.GrpcConfig); ok {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org
For additional commands, e-mail: notifications-h...@dubbo.apache.org

Reply via email to