nic-6443 commented on code in PR #13578: URL: https://github.com/apache/apisix/pull/13578#discussion_r3457420041
########## apisix/plugins/ai-cache.lua: ########## @@ -0,0 +1,199 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local schema = require("apisix.plugins.ai-cache.schema") +local key_mod = require("apisix.plugins.ai-cache.key") +local redis_util = require("apisix.utils.redis") + +local ngx = ngx +local ngx_null = ngx.null +local ipairs = ipairs +local concat = table.concat + +local CACHE_STATUS_HEADER = "X-AI-Cache-Status" +local CACHE_AGE_HEADER = "X-AI-Cache-Age" +local DEFAULT_TTL = 3600 +local DEFAULT_MAX_BODY = 1048576 + +local _M = { + version = 0.1, + priority = 1035, + name = "ai-cache", + schema = schema, +} + + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + + +local function release(conf, red) + local ok, err = red:set_keepalive(conf.redis_keepalive_timeout or 10000, + conf.redis_keepalive_pool or 100) + if not ok then + core.log.warn("ai-cache: failed to set redis keepalive: ", err) + end +end + + +local function serve_hit(conf, ctx, cached) + ctx.ai_cache_status = "HIT" + if conf.cache_headers ~= false then + core.response.set_header(CACHE_STATUS_HEADER, "HIT") + local age = ngx.time() - (cached.created_at or ngx.time()) + core.response.set_header(CACHE_AGE_HEADER, age < 0 and 0 or age) + end + core.response.set_header("Content-Type", "application/json") + return core.response.exit(200, cached.body) +end + + +function _M.access(conf, ctx) + -- Streaming responses are not cached in PR-1 (SSE replay is a later + -- increment). ai-proxy (higher priority) has already classified the + -- request, so bypass before doing any work. + if ctx.var.request_type == "ai_stream" then + ctx.ai_cache_status = "BYPASS" + return + end + + if conf.bypass_on then + for _, rule in ipairs(conf.bypass_on) do + if core.request.header(ctx, rule.header) == rule.equals then + ctx.ai_cache_status = "BYPASS" + return + end + end + end + + local body, err = core.request.get_json_request_body_table() + if not body then + core.log.warn("ai-cache: cannot read request body, bypassing: ", err) + ctx.ai_cache_status = "BYPASS" + return + end + + ctx.ai_cache_key = "ai-cache:l1:" .. key_mod.scope(conf, ctx) + .. ":" .. key_mod.fingerprint(ctx, body) + + local red + red, err = redis_util.new(conf) + if not red then + -- fail-open: never let a cache-backend outage break the request. + core.log.warn("ai-cache: redis unavailable, fail-open as MISS: ", err) + ctx.ai_cache_status = "MISS" + return + end + + local res + res, err = red:get(ctx.ai_cache_key) + release(conf, red) Review Comment: `release()` runs unconditionally here, before the `if err` check just below, so a connection that just errored/timed out on `get` still gets handed back to the keepalive pool. lua-resty-redis recommends not reusing a connection after a failure. The limit-count helper handles this by returning on error and only calling `set_keepalive` on the success path (see `apisix/plugins/limit-count/util.lua`). Suggest releasing only when there's no error (and `red:close()` otherwise), so one transient timeout doesn't poison pooled connections for subsequent requests. ########## apisix/plugins/ai-cache/schema.lua: ########## @@ -0,0 +1,95 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local redis_schema = require("apisix.utils.redis-schema") + +local policy_to_additional_properties = core.table.deepcopy(redis_schema.schema) + +local _M = { + type = "object", + properties = { + layers = { Review Comment: `layers` is validated and documented (defaults to `["exact"]`), but nothing in the plugin ever reads `conf.layers` — exact caching always runs unconditionally. Since `exact` is the only allowed value today, maybe drop the knob until the semantic/L2 layer actually lands, or at least note that it's currently inert? As is, it reads like a working toggle that does nothing. ########## apisix/plugins/ai-cache.lua: ########## @@ -0,0 +1,199 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local schema = require("apisix.plugins.ai-cache.schema") +local key_mod = require("apisix.plugins.ai-cache.key") +local redis_util = require("apisix.utils.redis") + +local ngx = ngx +local ngx_null = ngx.null +local ipairs = ipairs +local concat = table.concat + +local CACHE_STATUS_HEADER = "X-AI-Cache-Status" +local CACHE_AGE_HEADER = "X-AI-Cache-Age" +local DEFAULT_TTL = 3600 +local DEFAULT_MAX_BODY = 1048576 + +local _M = { + version = 0.1, + priority = 1035, + name = "ai-cache", + schema = schema, +} + + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + + +local function release(conf, red) + local ok, err = red:set_keepalive(conf.redis_keepalive_timeout or 10000, + conf.redis_keepalive_pool or 100) + if not ok then + core.log.warn("ai-cache: failed to set redis keepalive: ", err) + end +end + + +local function serve_hit(conf, ctx, cached) + ctx.ai_cache_status = "HIT" + if conf.cache_headers ~= false then + core.response.set_header(CACHE_STATUS_HEADER, "HIT") + local age = ngx.time() - (cached.created_at or ngx.time()) + core.response.set_header(CACHE_AGE_HEADER, age < 0 and 0 or age) + end + core.response.set_header("Content-Type", "application/json") + return core.response.exit(200, cached.body) +end + + +function _M.access(conf, ctx) + -- Streaming responses are not cached in PR-1 (SSE replay is a later + -- increment). ai-proxy (higher priority) has already classified the + -- request, so bypass before doing any work. + if ctx.var.request_type == "ai_stream" then + ctx.ai_cache_status = "BYPASS" + return + end + + if conf.bypass_on then + for _, rule in ipairs(conf.bypass_on) do + if core.request.header(ctx, rule.header) == rule.equals then + ctx.ai_cache_status = "BYPASS" + return + end + end + end + + local body, err = core.request.get_json_request_body_table() + if not body then + core.log.warn("ai-cache: cannot read request body, bypassing: ", err) + ctx.ai_cache_status = "BYPASS" + return + end + + ctx.ai_cache_key = "ai-cache:l1:" .. key_mod.scope(conf, ctx) + .. ":" .. key_mod.fingerprint(ctx, body) + + local red + red, err = redis_util.new(conf) + if not red then + -- fail-open: never let a cache-backend outage break the request. + core.log.warn("ai-cache: redis unavailable, fail-open as MISS: ", err) + ctx.ai_cache_status = "MISS" + return + end + + local res + res, err = red:get(ctx.ai_cache_key) + release(conf, red) + if err then + core.log.warn("ai-cache: redis get failed, fail-open as MISS: ", err) + ctx.ai_cache_status = "MISS" + return + end + + if res ~= nil and res ~= ngx_null then + local cached = core.json.decode(res) + if cached and cached.body then + return serve_hit(conf, ctx, cached) + end + core.log.warn("ai-cache: discarding malformed cache entry for ", ctx.ai_cache_key) + end + + ctx.ai_cache_status = "MISS" +end + + +function _M.header_filter(conf, ctx) + if ctx.ai_cache_status and conf.cache_headers ~= false then + core.response.set_header(CACHE_STATUS_HEADER, ctx.ai_cache_status) + end +end + + +function _M.body_filter(conf, ctx) + -- only a MISS gets written back; HIT exited in access, BYPASS opts out. + if ctx.ai_cache_status ~= "MISS" or ctx.ai_cache_oversized then + return + end + local chunk = ngx.arg[1] + if chunk and #chunk > 0 then + local buf = ctx.ai_cache_buf + if not buf then + buf = { n = 0, bytes = 0 } + ctx.ai_cache_buf = buf + end + local n = buf.n + 1 + buf.n = n + buf[n] = chunk + buf.bytes = buf.bytes + #chunk + if buf.bytes > (conf.max_cache_body_size or DEFAULT_MAX_BODY) then + ctx.ai_cache_buf = nil + ctx.ai_cache_oversized = true + end + end +end + + +-- The response-capturing phases (body_filter / log) run in contexts where +-- cosockets are disabled, so the Redis write is deferred to a 0-delay timer +-- (timers run in a light thread where cosockets are allowed). +local function write_to_cache(premature, conf, cache_key, response_body) + if premature then + return + end + local red, err = redis_util.new(conf) + if not red then + core.log.warn("ai-cache: redis unavailable on write: ", err) + return + end + local envelope = core.json.encode({ body = response_body, created_at = ngx.time() }) + local ttl = (conf.exact and conf.exact.ttl) or DEFAULT_TTL + local ok + ok, err = red:set(cache_key, envelope, "EX", ttl) + if not ok then + core.log.warn("ai-cache: redis set failed: ", err) + end + release(conf, red) Review Comment: Same keepalive-on-error issue as the `get` path above: `release()` is called regardless of whether `red:set` failed. Better to skip the keepalive (close instead) when `err` is set. ########## apisix/plugins/ai-cache/key.lua: ########## @@ -0,0 +1,108 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local protocols = require("apisix.plugins.ai-protocols") +local rapidjson = require("rapidjson") +local sha256 = require("resty.sha256") +local to_hex = require("resty.string").to_hex + +local ipairs = ipairs +local pairs = pairs +local type = type +local getmetatable = getmetatable +local concat = table.concat + +local rapidjson_null = rapidjson.null +local ENCODE_OPTS = { sort_keys = true } + +local _M = {} + + +local function hex_digest(s) + local hash = sha256:new() + hash:update(s) + return to_hex(hash:final()) +end + + +local function to_rapidjson_value(data) + if data == core.json.null then + return rapidjson_null + end + if type(data) ~= "table" then + return data + end + if getmetatable(data) == core.json.array_mt then + local arr = {} + for i, v in ipairs(data) do + arr[i] = to_rapidjson_value(v) + end + return rapidjson.array(arr) + end + local obj = {} + for k, v in pairs(data) do + obj[k] = to_rapidjson_value(v) + end + return obj +end + + +local function canonical_encode(value) + return rapidjson.encode(to_rapidjson_value(value), ENCODE_OPTS) +end + + +function _M.fingerprint(ctx, body) + local params = {} + for k, v in pairs(body) do + if k ~= "messages" and k ~= "model" and k ~= "stream" then + params[k] = v + end + end + + local repr = canonical_encode({ + protocol = ctx.ai_client_protocol or "", + model = ctx.var.request_llm_model or body.model or "", + messages = protocols.get_messages(body, ctx) or {}, + params = params, + }) + return hex_digest(repr) +end + + +function _M.scope(conf, ctx) + local ck = conf.cache_key + local inc_vars = ck and ck.include_vars + if not (ck and ck.include_consumer) and (not inc_vars or #inc_vars == 0) then + return "shared" Review Comment: The default scope is just the literal `shared`, with no route in it. So if two routes both enable ai-cache against the same Redis and see the same protocol + requested model + messages, they compute the same key and serve each other's responses, even when they proxy to completely different upstreams (e.g. one route -> OpenAI `gpt-4o`, another -> a self-hosted `gpt-4o`). The docs `:::note` only warns about server-side model rewrite via `options.model`, but this collision happens with no rewrite at all. Would it make sense to fold `route_id` into the default scope, so cross-route sharing is opt-in rather than the default footgun? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
