Copilot commented on code in PR #688: URL: https://github.com/apache/dubbo-go-pixiu/pull/688#discussion_r2173630996
########## pkg/listener/grpc/grpc_listener.go: ########## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "context" + "crypto/tls" + "net" + "sync" + "time" +) + +import ( + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/reflection" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/common/codec/grpc/passthrough" + "github.com/apache/dubbo-go-pixiu/pkg/config" + "github.com/apache/dubbo-go-pixiu/pkg/filterchain" + "github.com/apache/dubbo-go-pixiu/pkg/listener" + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +// Register the gRPC listener service factory +func init() { + listener.SetListenerServiceFactory(model.ProtocolTypeGRPC, newGrpcListenerService) +} + +// Constants for gRPC listener +const ( + defaultTLSTimeout = 20 * time.Second + defaultGracePeriod = 5 * time.Second + defaultMinKeepalive = 30 * time.Second + defaultStartupWait = 100 * time.Millisecond + defaultShutdownTimeout = 5 * time.Second +) + +// GrpcListenerService implements the ListenerService interface for gRPC +type GrpcListenerService struct { + listener.BaseListenerService + server *grpc.Server + listener net.Listener + gShutdownConfig *listener.ListenerGracefulShutdownConfig + closeOnce sync.Once +} + +// newGrpcListenerService creates a new gRPC listener service +func newGrpcListenerService(lc *model.Listener, bs *model.Bootstrap) (listener.ListenerService, error) { + // Create network filter chain + fc := filterchain.CreateNetworkFilterChain(lc.FilterChain) + + // Initialize service with base configuration + ls := &GrpcListenerService{ + BaseListenerService: listener.BaseListenerService{ + Config: lc, + FilterChain: fc, + }, + gShutdownConfig: &listener.ListenerGracefulShutdownConfig{}, + } + + // Parse gRPC specific configuration + grpcConfig := model.MapInGrpcStruct(lc.Config) + + // Build server options with a proxy handler for unknown services + opts := buildGrpcServerOptions(grpcConfig, ls) + + // Create and configure gRPC server + server := grpc.NewServer(opts...) + registerProxyServices(server) + ls.server = server + + ls.logConfiguration() + + return ls, nil +} + +// Start initializes and starts the gRPC server +func (ls *GrpcListenerService) Start() error { + address := ls.Config.Address.SocketAddress.GetAddress() + + // Create network listener + listener, err := net.Listen("tcp", address) + if err != nil { + return errors.Wrapf(err, "failed to listen on %s", address) + } + ls.listener = listener + + ls.logConfiguration() + + // Start server in a goroutine + go ls.serveGrpc(listener) + + // The listener is ready as soon as net.Listen succeeds and the goroutine is running. + // We don't need a fixed sleep here as it's unreliable. + logger.Infof("gRPC listener successfully started at %s", address) + + return nil +} + +// serveGrpc runs the gRPC server on the provided listener +func (ls *GrpcListenerService) serveGrpc(listener net.Listener) { + logger.Info("gRPC server starting to serve...") + if err := ls.server.Serve(listener); err != nil { + logger.Errorf("gRPC server serve error: %v", err) + } else { + logger.Info("gRPC server stopped gracefully") + } +} + +// proxyStreamHandler handles all unknown gRPC streams and forwards them through the filter chain. +// This is the core of the gRPC proxy functionality. +func (ls *GrpcListenerService) proxyStreamHandler(srv any, ss grpc.ServerStream) error { + start := time.Now() + + // The full method name is available in the stream's context. + fullMethod, ok := grpc.MethodFromServerStream(ss) + if !ok { + return errors.New("could not determine method from stream") + } + + // This log is a bit too verbose, as the filter chain will provide more detailed logs. + // logger.Debugf("gRPC proxy stream request: %s", fullMethod) + + // Check if server is shutting down + if ls.gShutdownConfig.RejectRequest { + logger.Warnf("Rejecting gRPC stream request %s during shutdown", fullMethod) + return errors.New("server is shutting down") + } + + // Track active request count + ls.gShutdownConfig.ActiveCount++ + defer func() { + ls.gShutdownConfig.ActiveCount-- + }() + + // Since we don't have StreamInfo here, we must rely on the filter chain to get it if needed. + // For a pure proxy, we just need to forward the stream. + stream := &RPCStreamImpl{ServerStream: ss} + + // The filter chain needs RPCStreamInfo. Let's create a basic one. + // We can't know IsClientStream/IsServerStream without parsing the descriptor, + // but the grpc-proxy filter doesn't rely on it. It re-infers this. + // We pass the full method name which is the most critical piece of information. + streamInfo := &model.RPCStreamInfo{ + FullMethod: fullMethod, + } + + // Process stream through filter chain + err := ls.FilterChain.OnStreamRPC(stream, streamInfo) + + // Log request completion + duration := time.Since(start) + if err != nil { + logger.Errorf("gRPC stream request %s failed: %v (took %v)", fullMethod, err, duration) + } else { + logger.Debugf("gRPC stream for %s completed in %v", fullMethod, duration) + } + + return err +} + +// logConfiguration logs the current gRPC server configuration +func (ls *GrpcListenerService) logConfiguration() { + if grpcConfig, ok := ls.Config.Config.(model.GrpcConfig); ok { Review Comment: The configuration is returned as a pointer (from MapInGrpcStruct), so the type assertion should check for *model.GrpcConfig instead of model.GrpcConfig to ensure the logConfiguration code block executes as intended. ```suggestion if grpcConfig, ok := ls.Config.Config.(*model.GrpcConfig); ok { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org