AlinsRan commented on code in PR #13606:
URL: https://github.com/apache/apisix/pull/13606#discussion_r3478841506


##########
apisix/plugins/ai-lakera-guard.lua:
##########
@@ -206,4 +221,76 @@ function _M.access(conf, ctx)
 end
 
 
+function _M.lua_body_filter(conf, ctx, headers, body)
+    if conf.direction ~= "output" and conf.direction ~= "both" then
+        return
+    end
+
+    if ngx.status >= 400 then
+        return
+    end
+
+    -- Non-streaming: ai-proxy hands us the fully-assembled completion text.
+    if ctx.var.request_type == "ai_chat" then
+        local text = ctx.var.llm_response_text
+        if not text or text == "" then
+            return
+        end
+        return moderate_response(ctx, conf, text)
+    end
+
+    if ctx.var.request_type == "ai_stream" then
+        -- alert (shadow) mode non-blocking
+        if conf.action == "alert" then
+            if ctx.var.llm_request_done then
+                local text = ctx.var.llm_response_text
+                if text and text ~= "" then
+                    moderate_response(ctx, conf, text)
+                end
+            end
+            return
+        end
+
+        -- block mode
+        local buffer = ctx.lakera_response_buffer
+        if not buffer then
+            buffer = {}
+            ctx.lakera_response_buffer = buffer
+        end
+        buffer[#buffer + 1] = body or ""
+
+        if not ctx.var.llm_request_done then
+            -- Withhold this chunk until end-of-stream, replacing it with an 
SSE
+            -- keep-alive comment. Not "" (nginx treats an empty body as 
nothing
+            -- to flush) and not nil (which would let the original chunk reach
+            -- the client) -- the keep-alive holds the content back while 
keeping
+            -- the connection open.
+            return nil, ":\n\n"
+        end
+
+        local text = ctx.var.llm_response_text
+        if not text or text == "" then
+            if conf.fail_open then
+                core.log.warn("ai-lakera-guard: streamed response ended 
without ",
+                              "an assembled completion (no upstream usage 
event?); ",
+                              "fail_open=true, releasing unscanned")
+                return nil, concat(buffer)
+            end
+            core.log.error("ai-lakera-guard: streamed response ended without ",
+                           "an assembled completion (no upstream usage 
event?); ",
+                           "fail_open=false, blocking response")
+            return ngx.OK, deny_message(ctx, conf, 
conf.response_failure_message)
+        end
+
+        local code, message = moderate_response(ctx, conf, text)
+        if code then
+            return ngx.OK, message
+        end
+
+        -- Clean: release the buffered stream verbatim, preserving SSE framing.
+        return nil, concat(buffer)

Review Comment:
   **Block-mode streaming can duplicate (or strand) the response when a 
protocol converter is active**
   
   The release logic here keys "stream finished, flush the buffer" solely on 
`ctx.var.llm_request_done`. That holds for the same-protocol path exercised by 
the tests in this PR, where `parse_streaming_response` dispatches one 
`lua_body_filter` call per upstream chunk and `[DONE]` is the last one. But 
when a converter is active (cross-protocol, e.g. Anthropic client → OpenAI 
upstream via `anthropic-messages-to-openai-chat`, reachable through 
`ai-proxy-multi`), the assumption breaks:
   
   - In `apisix/plugins/ai-providers/base.lua`, `ctx.var.llm_request_done` is 
set while *parsing* the events of an upstream chunk (on `parsed.type == 
"done"`), i.e. **before** the dispatch loop; the dispatch loop then calls 
`lua_body_filter` **once per converted chunk**.
   - On `[DONE]`, `anthropic-messages-to-openai-chat.convert_sse_events` 
flushes **two** events in a single call — `message_delta` + `message_stop`. So 
the terminal upstream chunk yields 2 converted chunks, both dispatched with 
`llm_request_done == true`.
   
   **Variant A (upstream without `include_usage`):** the 1st converted chunk 
scans and `return nil, concat(buffer)` releases the whole buffer; the 2nd 
converted chunk does it **again** → the response body is delivered twice and 
Lakera is scanned twice.
   
   **Variant B (upstream with `include_usage`):** the converter defers 
`message_delta`/`message_stop` to the trailing usage-only chunk, whose 
`parsed.type == "usage"` does **not** set `llm_request_done`, so those events 
get buffered; then `[DONE]`'s `convert_sse_events` returns `nil` (nothing to 
dispatch) → the buffer is **never** released and the normal EOF path doesn't 
run a filter pass → the client receives only keep-alives and the clean response 
is dropped.
   
   The same-protocol path is unaffected, which is why CI is green.
   
   Suggested fix: gate the scan+release behind a one-shot flag (e.g. 
`ctx.lakera_response_released`) so it fires exactly once, and cover the 
"`llm_request_done` already set but this isn't the final dispatch" and "buffer 
still unreleased at EOF" cases (the latter may need an end-of-stream filter 
pass in `base.lua`, mirroring the abort path added in this PR). A regression 
test with `ai-proxy-multi` (Anthropic client + OpenAI streaming upstream) in 
block mode would lock this down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to