AlinsRan commented on code in PR #13606:
URL: https://github.com/apache/apisix/pull/13606#discussion_r3478841506


##########
apisix/plugins/ai-lakera-guard.lua:
##########
@@ -206,4 +221,76 @@ function _M.access(conf, ctx)
 end
 
 
+function _M.lua_body_filter(conf, ctx, headers, body)
+    if conf.direction ~= "output" and conf.direction ~= "both" then
+        return
+    end
+
+    if ngx.status >= 400 then
+        return
+    end
+
+    -- Non-streaming: ai-proxy hands us the fully-assembled completion text.
+    if ctx.var.request_type == "ai_chat" then
+        local text = ctx.var.llm_response_text
+        if not text or text == "" then
+            return
+        end
+        return moderate_response(ctx, conf, text)
+    end
+
+    if ctx.var.request_type == "ai_stream" then
+        -- alert (shadow) mode non-blocking
+        if conf.action == "alert" then
+            if ctx.var.llm_request_done then
+                local text = ctx.var.llm_response_text
+                if text and text ~= "" then
+                    moderate_response(ctx, conf, text)
+                end
+            end
+            return
+        end
+
+        -- block mode
+        local buffer = ctx.lakera_response_buffer
+        if not buffer then
+            buffer = {}
+            ctx.lakera_response_buffer = buffer
+        end
+        buffer[#buffer + 1] = body or ""
+
+        if not ctx.var.llm_request_done then
+            -- Withhold this chunk until end-of-stream, replacing it with an 
SSE
+            -- keep-alive comment. Not "" (nginx treats an empty body as 
nothing
+            -- to flush) and not nil (which would let the original chunk reach
+            -- the client) -- the keep-alive holds the content back while 
keeping
+            -- the connection open.
+            return nil, ":\n\n"
+        end
+
+        local text = ctx.var.llm_response_text
+        if not text or text == "" then
+            if conf.fail_open then
+                core.log.warn("ai-lakera-guard: streamed response ended 
without ",
+                              "an assembled completion (no upstream usage 
event?); ",
+                              "fail_open=true, releasing unscanned")
+                return nil, concat(buffer)
+            end
+            core.log.error("ai-lakera-guard: streamed response ended without ",
+                           "an assembled completion (no upstream usage 
event?); ",
+                           "fail_open=false, blocking response")
+            return ngx.OK, deny_message(ctx, conf, 
conf.response_failure_message)
+        end
+
+        local code, message = moderate_response(ctx, conf, text)
+        if code then
+            return ngx.OK, message
+        end
+
+        -- Clean: release the buffered stream verbatim, preserving SSE framing.
+        return nil, concat(buffer)

Review Comment:
   **Block-mode streaming can duplicate or drop the response when a protocol 
converter is active** (e.g. Anthropic client → OpenAI upstream via 
`ai-proxy-multi`). The same-protocol path tested here is fine, so CI stays 
green.
   
   **Root cause:** release is gated only on `llm_request_done`. In 
`ai-providers/base.lua` that flag is set *before* the dispatch loop, and with a 
converter the loop calls `lua_body_filter` **once per converted chunk**. On 
`[DONE]`, `anthropic-messages-to-openai-chat` emits **two** chunks 
(`message_delta` + `message_stop`) — both dispatched with `llm_request_done == 
true`.
   
   **Two symptoms:**
   - *Upstream without `include_usage`:* both terminal chunks hit `return nil, 
concat(buffer)` → **response sent twice + Lakera scanned twice**.
   - *Upstream with `include_usage`:* the converter defers the terminal events 
to the usage chunk (type `usage`, doesn't set `llm_request_done`), then 
`[DONE]` converts to nothing → **buffer never released, clean response 
dropped** (client gets only keep-alives).
   
   **Fix:** gate scan+release behind a one-shot flag (e.g. 
`ctx.lakera_response_released`) so it runs exactly once, and ensure the buffer 
is flushed at EOF even when the final dispatch produced no chunk (an 
end-of-stream filter pass in `base.lua`, like the abort path this PR adds). A 
block-mode `ai-proxy-multi` regression test (Anthropic client + OpenAI 
streaming upstream) would lock it down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to