Copilot commented on code in PR #769:
URL: https://github.com/apache/dubbo-go-pixiu/pull/769#discussion_r2418580733


##########
pkg/filter/mcp/mcpserver/transport/session_manager.go:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "crypto/rand"
+       "encoding/hex"
+       "io"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+const (
+       SessionTimeout    = 30 * time.Minute
+       CleanupInterval   = 5 * time.Minute
+       KeepaliveInterval = 30 * time.Second
+)
+
+// MCPSession represents an active MCP session
+type MCPSession struct {
+       ID           string
+       CreatedAt    time.Time
+       LastActivity time.Time
+       PipeWriter   *io.PipeWriter // Pipe writer for sending SSE messages
+       Done         chan struct{}
+}
+
+// SessionManager manages MCP sessions for SSE connections
+type SessionManager struct {
+       sessions map[string]*MCPSession
+       mu       sync.RWMutex
+       stopCh   chan struct{}
+       once     sync.Once
+}
+
+// NewSessionManager creates a new session manager
+func NewSessionManager() *SessionManager {
+       sm := &SessionManager{
+               sessions: make(map[string]*MCPSession),
+               stopCh:   make(chan struct{}),
+       }
+       go sm.startCleanupRoutine()
+       return sm
+}
+
+// EnsureSession gets existing session or creates new one
+func (sm *SessionManager) EnsureSession(sessionIDHeader string) (*MCPSession, 
bool) {
+       sm.mu.Lock()
+       defer sm.mu.Unlock()
+
+       // Try to get existing session
+       if sessionIDHeader != "" {
+               if session, exists := sm.sessions[sessionIDHeader]; exists {
+                       session.LastActivity = time.Now()
+                       return session, false // existing session
+               }
+       }
+
+       // Create new session
+       sessionID := sm.generateSessionID()
+       session := &MCPSession{
+               ID:           sessionID,
+               CreatedAt:    time.Now(),
+               LastActivity: time.Now(),
+               Done:         make(chan struct{}),
+       }
+
+       sm.sessions[sessionID] = session
+       logger.Infof("[dubbo-go-pixiu] mcp server created new session: %s", 
sessionID)
+       return session, true // new session
+}
+
+// Session retrieves a session by ID
+func (sm *SessionManager) Session(sessionID string) (*MCPSession, bool) {
+       sm.mu.RLock()
+       defer sm.mu.RUnlock()
+
+       session, exists := sm.sessions[sessionID]
+       if exists {
+               session.LastActivity = time.Now()
+       }
+       return session, exists
+}
+
+// RemoveSession removes a session and cleans up resources
+func (sm *SessionManager) RemoveSession(sessionID string) {
+       sm.mu.Lock()
+       defer sm.mu.Unlock()
+
+       if session, exists := sm.sessions[sessionID]; exists {
+               // Close Done channel to signal goroutines
+               close(session.Done)
+
+               // Close PipeWriter to end the SSE stream
+               if session.PipeWriter != nil {
+                       session.PipeWriter.Close()
+               }
+
+               delete(sm.sessions, sessionID)
+               logger.Infof("[dubbo-go-pixiu] mcp server removed session: %s", 
sessionID)
+       }
+}
+
+// Stop stops the session manager
+func (sm *SessionManager) Stop() {
+       sm.once.Do(func() {
+               close(sm.stopCh)
+
+               sm.mu.Lock()
+               for sessionID, session := range sm.sessions {
+                       close(session.Done)
+                       delete(sm.sessions, sessionID)
+               }
+               sm.mu.Unlock()
+       })
+}
+
+// generateSessionID generates a unique session ID
+func (sm *SessionManager) generateSessionID() string {
+       bytes := make([]byte, 16)
+       if _, err := rand.Read(bytes); err != nil {
+               // Fallback to timestamp-based ID
+               return hex.EncodeToString([]byte(time.Now().String()))

Review Comment:
   The fallback session ID generation uses timestamp which is predictable and 
could be a security risk. Consider using a more secure fallback like 
crypto/rand with a fixed seed or os.Urandom.



##########
pkg/adapter/mcpserver/common/util/url.go:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import (
+       "fmt"
+       "net"
+       "net/url"
+       "regexp"
+       "strconv"
+       "strings"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// ParseResult holds the result of URL parsing
+type ParseResult struct {
+       Host         string
+       Port         int
+       UsedFallback bool
+       FallbackInfo string
+}
+
+// URLParseError represents a general URL parsing error
+type URLParseError struct {
+       URL    string
+       Reason string
+}
+
+func (e *URLParseError) Error() string {
+       return fmt.Sprintf("failed to parse URL '%s': %s", e.URL, e.Reason)
+}
+
+// ParseHostPortFromURL extracts host and port from a raw URL or host:port 
string.
+// It supports forms like:
+// - http://host:port/path?query
+// - https://host:port
+// - host:port
+// Returns ParseResult with host, port, and fallback information, or error for 
invalid formats.
+// Fallback ports: HTTP(80), HTTPS(443), others(8080)
+func ParseHostPortFromURL(raw string) (*ParseResult, error) {
+       if raw == "" {
+               return nil, &URLParseError{URL: raw, Reason: "empty URL"}
+       }
+
+       addr := strings.TrimSpace(raw)
+
+       if strings.Contains(addr, constant.ProtocolSlash) {
+               u, err := url.Parse(addr)
+               if err != nil {
+                       return nil, &URLParseError{URL: raw, Reason: 
fmt.Sprintf("invalid URL format: %v", err)}
+               }
+               if u.Host == "" {
+                       return nil, &URLParseError{URL: raw, Reason: "missing 
host in URL"}
+               }
+               return parseHostPortWithFallback(u.Host, u.Scheme, raw)
+       }
+
+       return parseHostPortWithFallback(addr, "", raw)
+}
+
+func parseHostPortWithFallback(hostport, scheme, originalURL string) 
(*ParseResult, error) {
+       host, portStr, err := net.SplitHostPort(hostport)
+       if err != nil {
+               // No port specified, use fallback based on scheme
+               host = strings.TrimSpace(hostport)
+               if host == "" {
+                       return nil, &URLParseError{URL: originalURL, Reason: 
"missing host"}
+               }
+
+               fallbackPort := getFallbackPort(scheme)
+               fallbackInfo := fmt.Sprintf("used fallback port %d", 
fallbackPort)
+               if scheme != "" {
+                       fallbackInfo = fmt.Sprintf("used fallback port %d for 
protocol %s", fallbackPort, scheme)
+               }
+
+               return &ParseResult{
+                       Host:         host,
+                       Port:         fallbackPort,
+                       UsedFallback: true,
+                       FallbackInfo: fallbackInfo,
+               }, nil
+       }
+
+       port, err := strconv.Atoi(portStr)
+       if err != nil || port <= 0 {
+               return nil, &URLParseError{URL: originalURL, Reason: 
fmt.Sprintf("invalid port '%s'", portStr)}
+       }
+
+       host = strings.TrimSpace(host)
+       if host == "" {
+               return nil, &URLParseError{URL: originalURL, Reason: "missing 
host"}
+       }
+
+       return &ParseResult{
+               Host:         host,
+               Port:         port,
+               UsedFallback: false,
+               FallbackInfo: "",
+       }, nil
+}
+
+func getFallbackPort(scheme string) int {
+       switch strings.ToLower(scheme) {
+       case "http":
+               return 80
+       case "https":
+               return 443
+       default:
+               return 8080 // Default for MCP servers
+       }
+}
+
+// goTmplArgRe is the regex pattern for Go template arguments
+var goTmplArgRe = regexp.MustCompile(`\{\{\.args\.(?P<name>[a-zA-Z0-9_\-]+)}}`)
+
+// ExtractPathFromURL extracts the path component from a URL string.
+// Supports MCP template parameter conversion: {{.args.name}} -> {name}
+func ExtractPathFromURL(raw string) string {
+       if raw == "" {
+               return constant.PathSlash
+       }
+       s := strings.TrimSpace(raw)
+
+       // Prefer url.Parse to extract the path
+       if i := strings.Index(s, constant.ProtocolSlash); i >= 0 {
+               if u, err := url.Parse(s); err == nil {
+                       path := u.Path
+                       if path == "" {
+                               path = constant.PathSlash
+                       }
+                       return ReplaceGoTemplateArgsInPath(path)
+               }
+               // Fallback: remove the scheme and process
+               s = s[i+3:]
+       }
+
+       // Handle host[:port]/path form without a scheme
+       slash := strings.IndexByte(s, '/')
+       if slash >= 0 {
+               // If the colon appears before the first slash, treat the 
portion after the slash as the path
+               colon := strings.IndexByte(s, ':')
+               if colon >= 0 && colon < slash {
+                       path := s[slash:]
+                       if path == "" {
+                               return constant.PathSlash
+                       }
+                       return ReplaceGoTemplateArgsInPath(path)
+               }
+               // Otherwise, it is a path or relative path
+               if s[0] != '/' {
+                       return ReplaceGoTemplateArgsInPath(constant.PathSlash + 
s[slash+1:])
+               }
+               return ReplaceGoTemplateArgsInPath(s[slash:])
+       }
+
+       // No slash found, return root path
+       return constant.PathSlash
+}
+
+// ReplaceGoTemplateArgsInPath converts Go template args {{.args.name}} to 
standard format {name}
+func ReplaceGoTemplateArgsInPath(path string) string {
+       if path == "" {
+               return constant.PathSlash
+       }
+       return goTmplArgRe.ReplaceAllString(path, `{$1}`)
+}
+
+// ValidateNacosAddresses validates comma-separated Nacos addresses
+func ValidateNacosAddresses(addresses string) error {
+       if strings.TrimSpace(addresses) == "" {
+               return fmt.Errorf("nacos addresses cannot be empty")
+       }

Review Comment:
   The function returns different error types (fmt.Errorf vs URLParseError) 
inconsistently. Consider using a custom error type or consistently using 
fmt.Errorf for all validation errors to provide a uniform error handling 
interface.



##########
pkg/filter/mcp/mcpserver/dynamic.go:
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mcpserver
+
+import (
+       "crypto/sha256"
+       "encoding/hex"
+       "encoding/json"
+       "fmt"
+       "sort"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+const (
+       // DefaultDebounceTime default debounce interval
+       DefaultDebounceTime = 500 * time.Millisecond
+
+       // EmptyFingerprint fingerprint value for empty configuration
+       EmptyFingerprint = "00000000"
+)
+
+// ServerToolConfig tool configuration for a single server
+type ServerToolConfig struct {
+       Tools       []model.ToolConfig
+       Fingerprint string
+       LastApplied time.Time
+}
+
+// DynamicConsumer applies dynamic MCP configurations into the registry
+type DynamicConsumer struct {
+       registry       *ToolRegistry
+       sessionManager *transport.SessionManager
+
+       // Tool configuration management grouped by server
+       mu            sync.RWMutex
+       serverConfigs map[string]*ServerToolConfig // serverId -> server tool 
configuration
+       debounceTime  time.Duration
+}
+
+func NewDynamicConsumer(reg *ToolRegistry, sm *transport.SessionManager) 
*DynamicConsumer {
+       return &DynamicConsumer{
+               registry:       reg,
+               sessionManager: sm,
+               serverConfigs:  make(map[string]*ServerToolConfig),
+               debounceTime:   DefaultDebounceTime,
+       }
+}
+
+// ApplyMcpServerConfigByServer applies configuration by server ID
+func (d *DynamicConsumer) ApplyMcpServerConfigByServer(serverId string, cfg 
*model.McpServerConfig) error {
+       if cfg == nil {
+               return d.removeServerConfig(serverId)
+       }
+
+       d.mu.Lock()
+       defer d.mu.Unlock()
+
+       // 1. Calculate new configuration fingerprint
+       fingerprint := d.calculateFingerprint(cfg.Tools)
+
+       // 2. Check if the server's configuration really needs to be updated
+       if existingConfig, exists := d.serverConfigs[serverId]; exists {
+               if existingConfig.Fingerprint == fingerprint {
+                       logger.Debugf("[dubbo-go-pixiu] mcp server %s config 
unchanged (fp=%s), skipped", serverId, fingerprint)
+                       return nil
+               }
+       }
+
+       // 3. Debounce check (based on this server's configuration change time)
+       now := time.Now()
+       if existingConfig, exists := d.serverConfigs[serverId]; exists {
+               // Skip only if this server is within debounce time
+               if !existingConfig.LastApplied.IsZero() && 
now.Sub(existingConfig.LastApplied) < d.debounceTime {
+                       logger.Debugf("[dubbo-go-pixiu] mcp server %s debounce 
active (elapsed=%v), skipped", serverId, now.Sub(existingConfig.LastApplied))
+                       return nil
+               }
+       }
+
+       // 4. Fully replace the server's tool configuration
+       oldConfig := d.serverConfigs[serverId]
+       serverConfig := &ServerToolConfig{
+               Tools:       make([]model.ToolConfig, len(cfg.Tools)),
+               Fingerprint: fingerprint,
+               LastApplied: now,
+       }
+       copy(serverConfig.Tools, cfg.Tools)
+       d.serverConfigs[serverId] = serverConfig
+
+       // 5. Recalculate merged tools from all servers and apply to registry
+       mergedTools := d.calculateCurrentMergedTools()
+       if err := d.applyMergedConfig(mergedTools); err != nil {
+               // Rollback
+               if oldConfig != nil {
+                       d.serverConfigs[serverId] = oldConfig
+               } else {
+                       delete(d.serverConfigs, serverId)
+               }
+               return err
+       }
+
+       logger.Infof("[dubbo-go-pixiu] mcp server %s config applied: %d tools, 
total servers: %d, merged tools: %d",
+               serverId, len(cfg.Tools), len(d.serverConfigs), 
len(mergedTools))
+
+       // Notify all connected clients about tools list change
+       d.notifyToolsListChanged()
+
+       return nil
+}
+
+// calculateFingerprint calculates a robust fingerprint for the configuration 
using SHA256
+func (d *DynamicConsumer) calculateFingerprint(tools []model.ToolConfig) 
string {
+       if len(tools) == 0 {
+               return EmptyFingerprint
+       }
+
+       // Create a sorted list of tools for consistent hashing
+       sortedTools := make([]model.ToolConfig, len(tools))
+       copy(sortedTools, tools)
+       sort.Slice(sortedTools, func(i, j int) bool {
+               if sortedTools[i].Name != sortedTools[j].Name {
+                       return sortedTools[i].Name < sortedTools[j].Name
+               }
+               return sortedTools[i].Cluster < sortedTools[j].Cluster
+       })
+
+       // Build hash input string
+       hash := sha256.New()
+       for _, tool := range sortedTools {
+               hash.Write([]byte(fmt.Sprintf("name:%s;cluster:%s;args:%d;", 
tool.Name, tool.Cluster, len(tool.Args))))
+       }
+
+       // Return first 8 characters of hex encoded hash
+       fullHash := hex.EncodeToString(hash.Sum(nil))
+       return fullHash[:8]
+}
+
+// SetDebounceTime dynamically adjusts debounce time
+func (d *DynamicConsumer) SetDebounceTime(duration time.Duration) {
+       d.mu.Lock()
+       defer d.mu.Unlock()
+
+       if duration >= 0 {
+               d.debounceTime = duration
+               logger.Infof("[dubbo-go-pixiu] mcp dynamic debounce time 
updated to %v", duration)
+       }
+}
+
+// GetDebounceInfo returns debounce state information (for 
debugging/monitoring)
+func (d *DynamicConsumer) GetDebounceInfo() map[string]interface{} {
+       d.mu.RLock()
+       defer d.mu.RUnlock()
+
+       return map[string]interface{}{
+               "debounce_time": d.debounceTime.String(),
+               "server_count":  len(d.serverConfigs),
+       }
+}
+
+// ResetDebounceState resets debounce state (mainly for testing)
+func (d *DynamicConsumer) ResetDebounceState() {
+       d.mu.Lock()
+       defer d.mu.Unlock()
+
+       // Clear all server configurations
+       d.serverConfigs = make(map[string]*ServerToolConfig)
+       logger.Debugf("[dubbo-go-pixiu] mcp dynamic debounce state reset")
+}
+
+// applyMergedConfig applies merged configuration to the registry
+func (d *DynamicConsumer) applyMergedConfig(tools []model.ToolConfig) error {
+       d.registry.ReplaceAllTools(tools)
+       return nil
+}
+
+// removeServerConfig removes server configuration
+func (d *DynamicConsumer) removeServerConfig(serverId string) error {
+       d.mu.Lock()
+       defer d.mu.Unlock()
+
+       if _, exists := d.serverConfigs[serverId]; !exists {
+               return nil // Already does not exist
+       }
+
+       delete(d.serverConfigs, serverId)
+
+       // Recalculate and apply merged configuration
+       mergedTools := d.calculateCurrentMergedTools()
+       if err := d.applyMergedConfig(mergedTools); err != nil {
+               return err
+       }
+
+       logger.Infof("[dubbo-go-pixiu] mcp server %s config removed, remaining 
servers: %d",
+               serverId, len(d.serverConfigs))
+
+       return nil
+}
+
+// calculateCurrentMergedTools calculates merged tools from all current servers
+func (d *DynamicConsumer) calculateCurrentMergedTools() []model.ToolConfig {
+       var allTools []model.ToolConfig
+
+       // Simply accumulate tools from all servers
+       for _, config := range d.serverConfigs {
+               allTools = append(allTools, config.Tools...)
+       }
+
+       return allTools
+}
+
+// notifyToolsListChanged sends notifications/tools/list_changed to all 
connected clients
+func (d *DynamicConsumer) notifyToolsListChanged() {
+       if d.sessionManager == nil {
+               logger.Debugf("[dubbo-go-pixiu] mcp server session manager not 
available, skip tools list_changed notification")
+               return
+       }
+
+       // Get all active sessions
+       sessionIDs := d.sessionManager.AllSessionIDs()
+       if len(sessionIDs) == 0 {
+               logger.Debugf("[dubbo-go-pixiu] mcp server no active sessions, 
skip tools list_changed notification")
+               return
+       }
+
+       // Send notification to each session
+       successCount := 0
+       for _, sessionID := range sessionIDs {
+               if err := d.sendToolsListChangedNotification(sessionID); err != 
nil {
+                       logger.Warnf("[dubbo-go-pixiu] mcp server failed to 
send tools list_changed to session %s: %v", sessionID, err)
+               } else {
+                       successCount++
+               }
+       }
+
+       logger.Infof("[dubbo-go-pixiu] mcp server sent tools/list_changed 
notification to %d/%d sessions", successCount, len(sessionIDs))
+}
+
+// sendToolsListChangedNotification sends notification to a specific session
+func (d *DynamicConsumer) sendToolsListChangedNotification(sessionID string) 
error {
+       session, exists := d.sessionManager.Session(sessionID)
+       if !exists {
+               return fmt.Errorf("session not found")
+       }
+
+       if session.PipeWriter == nil {
+               return fmt.Errorf("SSE pipe not established")
+       }
+
+       // Build tools/list_changed notification (no params needed)
+       notification := map[string]any{
+               "jsonrpc": "2.0",
+               "method":  "notifications/tools/list_changed",
+       }
+
+       messageJSON, err := json.Marshal(notification)
+       if err != nil {
+               return fmt.Errorf("failed to marshal notification: %w", err)
+       }
+
+       sseData := fmt.Sprintf("data: %s\n\n", string(messageJSON))

Review Comment:
   The SSE message formatting is duplicated here and in the SSEHandler. This 
should use the existing FormatSSEMessage method from SSEHandler to maintain 
consistency and reduce code duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to