shreemaan-abhishek commented on code in PR #13482:
URL: https://github.com/apache/apisix/pull/13482#discussion_r3411303891


##########
apisix/plugins/mcp/server_wrapper.lua:
##########
@@ -50,10 +67,38 @@ local function sse_handler(conf, ctx, opts)
         end
         server:start() -- this is a sync call that only returns when the 
client disconnects
     end
+end
+
+
+local function sse_handler(conf, ctx, opts)
+    local server = opts.server
+
+    -- bound the number of concurrent sessions a worker will keep open, so a
+    -- route cannot be driven to spawn an unbounded number of backend processes
+    local max_sessions = conf.max_sessions or DEFAULT_MAX_SESSIONS
+    if not session_limit.acquire(max_sessions) then
+        core.log.warn("mcp session limit reached (", max_sessions,
+                      "), rejecting new SSE connection")
+        return core.response.exit(429)
+    end
 
-    if opts.event_handler.on_disconnect then
+    local ok, code, body = pcall(run_session, conf, opts, server)
+
+    -- always release: tear down the backend process/broker state and free the
+    -- session slot regardless of how the loop ended
+    if opts.event_handler and opts.event_handler.on_disconnect then
         opts.event_handler.on_disconnect({ server = server })
-        server:close()
+    end
+    server:close()
+    session_limit.release()

Review Comment:
   Addressed: the teardown (on_disconnect + server:close) is now wrapped in its 
own pcall and session_limit.release() runs unconditionally afterwards, so a 
raising handler can no longer skip the release and pin the worker at the 
ceiling.



##########
apisix/plugins/mcp/session_limit.lua:
##########
@@ -0,0 +1,50 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- Tracks the number of live MCP SSE sessions handled by the current worker so
+-- that a single route cannot spawn an unbounded number of backend processes.
+local _M = {}
+
+
+local count = 0
+
+
+-- Try to reserve a slot for a new session. Returns true if the worker is below
+-- the configured ceiling, false otherwise. Every successful acquire() must be
+-- balanced by exactly one release().
+function _M.acquire(max)
+    if count >= max then
+        return false
+    end
+    count = count + 1
+    return true
+end

Review Comment:
   Addressed: acquire() now returns false for a nil or non-numeric ceiling 
instead of raising, so a misconfigured or future call site fails closed.



##########
apisix/plugins/mcp/server_wrapper.lua:
##########
@@ -50,10 +67,45 @@ local function sse_handler(conf, ctx, opts)
         end
         server:start() -- this is a sync call that only returns when the 
client disconnects
     end
+end
+
+
+local function sse_handler(conf, ctx, opts)
+    local server = opts.server
 
-    if opts.event_handler.on_disconnect then
-        opts.event_handler.on_disconnect({ server = server })
+    -- bound the number of concurrent sessions a worker will keep open, so a
+    -- route cannot be driven to spawn an unbounded number of backend processes
+    local max_sessions = conf.max_sessions or DEFAULT_MAX_SESSIONS
+    if not session_limit.acquire(max_sessions) then
+        core.log.warn("mcp session limit reached (", max_sessions,
+                      "), rejecting new SSE connection")
+        return core.response.exit(429)

Review Comment:
   Addressed: the rejection now returns a JSON body ({"error_msg": "too many 
concurrent MCP sessions"}) instead of a bare 429.



##########
t/plugin/mcp-bridge.t:
##########
@@ -57,3 +57,62 @@ property "command" is required
 property "command" validation failed: wrong type: expected string, got number
 done
 property "args" validation failed: wrong type: expected array, got string
+
+
+
+=== TEST 2: max_sessions schema validation
+--- config
+    location /t {
+        content_by_lua_block {
+            local test_cases = {

Review Comment:
   Added TEST 4, which drives the real access() path and asserts the handler 
returns 429 with the JSON body once the ceiling is reached. A fully concurrent 
held-connection end-to-end test is impractical under Test::Nginx because each 
SSE GET blocks for the life of the connection, so this covers the actual 
rejection decision instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to