This is an automated email from the ASF dual-hosted git repository.
nic-6443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new ec981038c fix: keep chash ring stable during health changes (#13532)
ec981038c is described below
commit ec981038cb97c679f8863835d6ff68094bf5ab77
Author: Nic <[email protected]>
AuthorDate: Tue Jun 16 21:18:52 2026 +0800
fix: keep chash ring stable during health changes (#13532)
---
apisix/balancer.lua | 115 ++++++++--
apisix/healthcheck_manager.lua | 15 +-
apisix/plugins/ai-proxy-multi.lua | 171 +++++++++++---
t/node/chash-healthcheck-stable-ring.t | 204 +++++++++++++++++
.../ai-proxy-multi-chash-healthcheck-stable-ring.t | 252 +++++++++++++++++++++
5 files changed, 705 insertions(+), 52 deletions(-)
diff --git a/apisix/balancer.lua b/apisix/balancer.lua
index 5b6b3b029..7a358dadf 100644
--- a/apisix/balancer.lua
+++ b/apisix/balancer.lua
@@ -34,6 +34,9 @@ local pickers = {}
local lrucache_server_picker = core.lrucache.new({
ttl = 300, count = 256
})
+local lrucache_health_status = core.lrucache.new({
+ ttl = 300, count = 256
+})
local lrucache_addr = core.lrucache.new({
ttl = 300, count = 1024 * 4
})
@@ -60,33 +63,63 @@ local function transform_node(new_nodes, node)
end
-local function fetch_health_nodes(upstream, checker)
+local function fetch_all_nodes(upstream)
local nodes = upstream.nodes
- if not checker then
- local new_nodes = core.table.new(0, #nodes)
- for _, node in ipairs(nodes) do
- new_nodes = transform_node(new_nodes, node)
- end
- return new_nodes
+ local new_nodes = core.table.new(0, #nodes)
+ for _, node in ipairs(nodes) do
+ new_nodes = transform_node(new_nodes, node)
end
+ return new_nodes
+end
+
+local function create_health_status(upstream, checker)
+ local nodes = upstream.nodes
local host = upstream.checks and upstream.checks.active and
upstream.checks.active.host
local port = upstream.checks and upstream.checks.active and
upstream.checks.active.port
- local up_nodes = core.table.new(0, #nodes)
+ local health_status = core.table.new(0, #nodes)
+ local has_healthy_node = false
+
for _, node in ipairs(nodes) do
local ok, err = healthcheck_manager.fetch_node_status(checker,
node.host, port or node.port,
host)
+ local addr = node.host .. ":" .. node.port
if ok then
- up_nodes = transform_node(up_nodes, node)
- elseif err then
- core.log.warn("failed to get health check target status, addr: ",
- node.host, ":", port or node.port, ", host: ", host, ", err:
", err)
+ health_status[addr] = true
+ has_healthy_node = true
+ else
+ health_status[addr] = false
+ if err then
+ core.log.warn("failed to get health check target status, addr:
",
+ node.host, ":", port or node.port, ", host: ", host, ",
err: ", err)
+ end
end
end
- if core.table.nkeys(up_nodes) == 0 then
+ if not has_healthy_node then
core.log.warn("all upstream nodes is unhealthy, use default")
- for _, node in ipairs(nodes) do
+ return {all_unhealthy = true}
+ end
+
+ return {status = health_status}
+end
+
+
+-- Build the picker node set from the healthy subset, reusing
create_health_status
+-- so the per-node health lookup lives in exactly one place.
+local function fetch_health_nodes(upstream, checker)
+ if not checker then
+ return fetch_all_nodes(upstream)
+ end
+
+ local health_status = create_health_status(upstream, checker)
+ if health_status.all_unhealthy then
+ return fetch_all_nodes(upstream)
+ end
+
+ local up_nodes = core.table.new(0, #upstream.nodes)
+ for _, node in ipairs(upstream.nodes) do
+ if health_status.status[node.host .. ":" .. node.port] then
up_nodes = transform_node(up_nodes, node)
end
end
@@ -95,6 +128,21 @@ local function fetch_health_nodes(upstream, checker)
end
+local function fetch_health_status(upstream, checker, key, version)
+ if not checker then
+ return nil
+ end
+
+ local health_status = lrucache_health_status(key, version .. "#" ..
checker.status_ver,
+ create_health_status,
upstream, checker)
+ if not health_status or health_status.all_unhealthy then
+ return nil
+ end
+
+ return health_status.status
+end
+
+
local function create_server_picker(upstream, checker)
local picker = pickers[upstream.type]
if not picker then
@@ -112,7 +160,12 @@ local function create_server_picker(upstream, checker)
end
end
- local up_nodes = fetch_health_nodes(upstream, checker)
+ local up_nodes
+ if upstream.type == "chash" then
+ up_nodes = fetch_all_nodes(upstream)
+ else
+ up_nodes = fetch_health_nodes(upstream, checker)
+ end
if #up_nodes._priority_index > 1 then
core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes))
@@ -229,7 +282,12 @@ local function pick_server(route, ctx)
end
end
- if checker then
+ local health_status
+ if checker and up_conf.type == "chash" then
+ health_status = fetch_health_status(up_conf, checker, key, version)
+ end
+
+ if checker and up_conf.type ~= "chash" then
version = version .. "#" .. checker.status_ver
end
@@ -243,10 +301,29 @@ local function pick_server(route, ctx)
return nil, "failed to fetch server picker"
end
- local server, err = server_picker.get(ctx)
+ local server, err
+ for _ = 1, nodes_count do
+ server, err = server_picker.get(ctx)
+ if not server then
+ err = err or "no valid upstream node"
+ return nil, "failed to find valid upstream server, " .. err
+ end
+
+ if not health_status or health_status[server] then
+ break
+ end
+
+ ctx.balancer_server = server
+ if not server_picker.after_balance then
+ return nil, "failed to skip unhealthy upstream server:
after_balance is unavailable"
+ end
+
+ server_picker.after_balance(ctx, true)
+ server = nil
+ end
+
if not server then
- err = err or "no valid upstream node"
- return nil, "failed to find valid upstream server, " .. err
+ return nil, "failed to find valid upstream server, all upstream
servers tried"
end
ctx.balancer_server = server
diff --git a/apisix/healthcheck_manager.lua b/apisix/healthcheck_manager.lua
index 8133364ee..1ee49f64d 100644
--- a/apisix/healthcheck_manager.lua
+++ b/apisix/healthcheck_manager.lua
@@ -113,7 +113,20 @@ function _M.fetch_node_status(checker, ip, port, hostname)
return true
end
- return checker:get_target_status(ip, port, hostname)
+ local ok, err = checker:get_target_status(ip, port, hostname)
+ if err == "target not found" then
+ -- get_target_status reads a worker-local cache that resty.healthcheck
fills
+ -- asynchronously (add_target only raises an event), so right after a
checker
+ -- is created a target can be missing from this worker's view even
though it
+ -- is registered in the shm and being probed. Treat it as unknown
(usable)
+ -- rather than unhealthy, but still log it: a target that stays
missing means
+ -- the cache never converged, a real bug worth surfacing rather than
swallowing.
+ core.log.warn("health check target status not available yet, treat as
unknown",
+ ", addr: ", ip, ":", port, ", host: ", hostname)
+ return true
+ end
+
+ return ok, err
end
diff --git a/apisix/plugins/ai-proxy-multi.lua
b/apisix/plugins/ai-proxy-multi.lua
index 13a4b8e3e..4ba204ecc 100644
--- a/apisix/plugins/ai-proxy-multi.lua
+++ b/apisix/plugins/ai-proxy-multi.lua
@@ -45,6 +45,9 @@ local pickers = {}
local lrucache_server_picker = core.lrucache.new({
ttl = 300, count = 256
})
+local lrucache_health_status = core.lrucache.new({
+ ttl = 300, count = 256
+})
local plugin_name = "ai-proxy-multi"
local _M = {
@@ -346,26 +349,33 @@ local function get_checkers_status_ver(conf, checkers)
end
-local function fetch_health_instances(conf, checkers)
+local function fetch_all_instances(conf)
local instances = conf.instances
local new_instances = core.table.new(0, #instances)
- if not checkers then
- for _, ins in ipairs(conf.instances) do
- transform_instances(new_instances, ins)
- end
- return new_instances
+ for _, ins in ipairs(instances) do
+ transform_instances(new_instances, ins)
end
+ return new_instances
+end
+
+
+local function create_health_status(conf, checkers)
+ local instances = conf.instances
+ local health_status = core.table.new(0, #instances)
+ local healthy_dns_nodes = core.table.new(0, #instances)
+ local has_healthy_instance = false
+
for _, ins in ipairs(instances) do
local checker = checkers[ins.name]
if checker then
local host = ins.checks and ins.checks.active and
ins.checks.active.host
local port = ins.checks and ins.checks.active and
ins.checks.active.port
local healthy_nodes = {}
- ins._healthy_dns_nodes = nil
for _, node in ipairs(ins._dns_nodes or {}) do
- local ok, err = checker:get_target_status(node.host, port or
node.port, host)
+ local ok, err = healthcheck_manager.fetch_node_status(checker,
+ node.host, port or
node.port, host)
if ok then
healthy_nodes[#healthy_nodes + 1] = node
elseif err then
@@ -375,18 +385,63 @@ local function fetch_health_instances(conf, checkers)
end
if #healthy_nodes > 0 then
- ins._healthy_dns_nodes = healthy_nodes
- transform_instances(new_instances, ins)
+ healthy_dns_nodes[ins.name] = healthy_nodes
+ health_status[ins.name] = true
+ has_healthy_instance = true
+ else
+ health_status[ins.name] = false
end
else
- ins._healthy_dns_nodes = nil
- transform_instances(new_instances, ins)
+ health_status[ins.name] = true
+ has_healthy_instance = true
end
end
- if core.table.nkeys(new_instances) == 0 then
+ if not has_healthy_instance then
core.log.warn("all upstream nodes is unhealthy, use default")
- for _, ins in ipairs(instances) do
+ return {all_unhealthy = true}
+ end
+
+ return {
+ status = health_status,
+ healthy_dns_nodes = healthy_dns_nodes,
+ }
+end
+
+
+local function apply_health_status(conf, health_status)
+ if not health_status or health_status.all_unhealthy then
+ for _, ins in ipairs(conf.instances) do
+ ins._healthy_dns_nodes = nil
+ end
+
+ return nil
+ end
+
+ for _, ins in ipairs(conf.instances) do
+ ins._healthy_dns_nodes = health_status.healthy_dns_nodes[ins.name]
+ end
+
+ return health_status.status
+end
+
+
+-- Build the picker instance set from the healthy subset, reusing
+-- create_health_status/apply_health_status so the per-instance health lookup
+-- lives in exactly one place.
+local function fetch_health_instances(conf, checkers)
+ if not checkers then
+ return fetch_all_instances(conf)
+ end
+
+ local status = apply_health_status(conf, create_health_status(conf,
checkers))
+ if not status then
+ return fetch_all_instances(conf)
+ end
+
+ local new_instances = core.table.new(0, #conf.instances)
+ for _, ins in ipairs(conf.instances) do
+ if status[ins.name] then
transform_instances(new_instances, ins)
end
end
@@ -395,6 +450,29 @@ local function fetch_health_instances(conf, checkers)
end
+local function get_health_status_ver(conf, checkers)
+ local parts = core.table.new(#conf.instances, 0)
+ for i, ins in ipairs(conf.instances) do
+ local checker = checkers[ins.name]
+ parts[i] = (ins._nodes_ver or 0) .. ":" .. (checker and
checker.status_ver or "x")
+ end
+
+ return table_concat(parts, "-")
+end
+
+
+local function fetch_health_status(conf, checkers, key, version)
+ if not checkers then
+ return nil
+ end
+
+ local health_status = lrucache_health_status(key, version .. "#" ..
+ get_health_status_ver(conf,
checkers),
+ create_health_status, conf,
checkers)
+ return apply_health_status(conf, health_status)
+end
+
+
local function create_server_picker(conf, ups_tab, checkers)
local picker = pickers[conf.balancer.algorithm] -- nil check
if not picker then
@@ -402,7 +480,12 @@ local function create_server_picker(conf, ups_tab,
checkers)
picker = pickers[conf.balancer.algorithm]
end
- local new_instances = fetch_health_instances(conf, checkers)
+ local new_instances
+ if conf.balancer.algorithm == "chash" then
+ new_instances = fetch_all_instances(conf)
+ else
+ new_instances = fetch_health_instances(conf, checkers)
+ end
core.log.info("fetch health instances: ",
core.json.delay_encode(new_instances))
if #new_instances._priority_index > 1 then
@@ -448,8 +531,13 @@ local function pick_target(ctx, conf, ups_tab)
end
end
- local version = plugin.conf_version(conf) .. "#" ..
- get_checkers_status_ver(conf, checkers)
+ local health_status
+ local version = plugin.conf_version(conf)
+ if conf.balancer.algorithm == "chash" then
+ health_status = fetch_health_status(conf, checkers,
ctx.matched_route.key, version)
+ else
+ version = version .. "#" .. get_checkers_status_ver(conf, checkers)
+ end
local server_picker = ctx.server_picker
if not server_picker then
@@ -461,29 +549,48 @@ local function pick_target(ctx, conf, ups_tab)
end
ctx.server_picker = server_picker
- local instance_name, err = server_picker.get(ctx)
- if err then
- return nil, nil, err
+ local ai_rate_limiting
+ local check_rate_limiting = conf.fallback_strategy ==
"instance_health_and_rate_limiting" or
+ fallback_strategy_has(conf.fallback_strategy,
"rate_limiting")
+ if check_rate_limiting then
+ ai_rate_limiting = require("apisix.plugins.ai-rate-limiting")
end
- ctx.balancer_server = instance_name
- if conf.fallback_strategy == "instance_health_and_rate_limiting" or -- for
backwards compatible
- fallback_strategy_has(conf.fallback_strategy, "rate_limiting") then
- local ai_rate_limiting = require("apisix.plugins.ai-rate-limiting")
- for _ = 1, #conf.instances do
- if ai_rate_limiting.check_instance_status(nil, ctx, instance_name)
then
+
+ local instance_name, err
+ for _ = 1, #conf.instances do
+ instance_name, err = server_picker.get(ctx)
+ if err then
+ return nil, nil, err
+ end
+
+ if not health_status or health_status[instance_name] then
+ if not check_rate_limiting or
+ ai_rate_limiting.check_instance_status(nil, ctx, instance_name)
then
break
end
core.log.warn("ai instance: ", instance_name,
" is not available, try to pick another one")
- server_picker.after_balance(ctx, true)
- instance_name, err = server_picker.get(ctx)
- if err then
- return nil, nil, err
- end
- ctx.balancer_server = instance_name
+
+ else
+ core.log.warn("ai instance: ", instance_name,
+ " is unhealthy, try to pick another one")
end
+
+ ctx.balancer_server = instance_name
+ if not server_picker.after_balance then
+ return nil, nil, "failed to skip AI instance: after_balance is
unavailable"
+ end
+
+ server_picker.after_balance(ctx, true)
+ instance_name = nil
end
+ if not instance_name then
+ return nil, nil, "all servers tried"
+ end
+
+ ctx.balancer_server = instance_name
+
local instance_conf = get_instance_conf(conf.instances, instance_name)
local nodes = instance_conf._healthy_dns_nodes or instance_conf._dns_nodes
use_node_for_request(instance_conf, pick_request_node(nodes))
diff --git a/t/node/chash-healthcheck-stable-ring.t
b/t/node/chash-healthcheck-stable-ring.t
new file mode 100644
index 000000000..49eca4301
--- /dev/null
+++ b/t/node/chash-healthcheck-stable-ring.t
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ my $http_config = <<_EOC_;
+ server {
+ listen 127.0.0.1:16730;
+
+ location /server_port {
+ content_by_lua_block {
+ ngx.print("16730")
+ }
+ }
+
+ location /status {
+ return 500;
+ }
+ }
+
+ server {
+ listen 127.0.0.1:16731;
+
+ location /server_port {
+ content_by_lua_block {
+ ngx.print("16731")
+ }
+ }
+
+ location /status {
+ return 200;
+ }
+ }
+
+ server {
+ listen 127.0.0.1:16732;
+
+ location /server_port {
+ content_by_lua_block {
+ ngx.print("16732")
+ }
+ }
+
+ location /status {
+ return 200;
+ }
+ }
+_EOC_
+ $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: chash keeps healthy node mapping stable when health status changes
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local json = require("cjson.safe")
+ local http = require("resty.http")
+
+ local unhealthy_port = "16730"
+ local nodes = {
+ ["127.0.0.1:" .. unhealthy_port] = 3,
+ ["127.0.0.1:16731"] = 6,
+ ["127.0.0.1:16732"] = 10,
+ }
+
+ local function put_route(with_checks)
+ local upstream = {
+ type = "chash",
+ hash_on = "header",
+ key = "X-Sessionid",
+ nodes = nodes,
+ }
+
+ if with_checks then
+ upstream.checks = {
+ active = {
+ http_path = "/status",
+ healthy = {
+ interval = 1,
+ http_statuses = {200},
+ successes = 1,
+ },
+ unhealthy = {
+ interval = 1,
+ http_statuses = {500},
+ http_failures = 1,
+ tcp_failures = 1,
+ timeouts = 1,
+ },
+ },
+ }
+ end
+
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ json.encode({
+ uri = "/server_port",
+ upstream = upstream,
+ })
+ )
+ assert(code < 300, body)
+
+ return upstream
+ end
+
+ local function send(session_id)
+ local httpc = http.new()
+ local res, err = httpc:request_uri(
+ "http://127.0.0.1:" .. ngx.var.server_port ..
"/server_port",
+ {
+ method = "GET",
+ keepalive = false,
+ headers = {
+ ["X-Sessionid"] = session_id,
+ },
+ }
+ )
+ assert(res, err)
+ assert(res.status == 200, res.status .. ": " .. res.body)
+ return res.body
+ end
+
+ put_route(false)
+
+ local baseline = {}
+ local baseline_unhealthy = 0
+ local healthy_total = 0
+ local total = 120
+ for i = 1, total do
+ local key = "session-" .. i
+ local port = send(key)
+ baseline[key] = port
+ if port == unhealthy_port then
+ baseline_unhealthy = baseline_unhealthy + 1
+ else
+ healthy_total = healthy_total + 1
+ end
+ end
+ assert(baseline_unhealthy > 0, "baseline did not hit unhealthy
node")
+ assert(healthy_total > 0, "baseline did not hit healthy nodes")
+
+ put_route(true)
+ send("warmup")
+ ngx.sleep(3)
+
+ local healthy_moved = 0
+ local unhealthy_stayed = 0
+ for i = 1, total do
+ local key = "session-" .. i
+ local before = baseline[key]
+ local after = send(key)
+ if before == unhealthy_port then
+ if after == unhealthy_port then
+ unhealthy_stayed = unhealthy_stayed + 1
+ end
+ elseif after ~= before then
+ healthy_moved = healthy_moved + 1
+ end
+ end
+
+ assert(healthy_moved == 0,
+ "healthy-node keys moved after health change: " ..
healthy_moved)
+ assert(unhealthy_stayed == 0,
+ "unhealthy-node keys still routed to unhealthy node: " ..
unhealthy_stayed)
+
+ ngx.say("baseline_unhealthy=", baseline_unhealthy,
+ ", healthy_total=", healthy_total,
+ ", healthy_moved=", healthy_moved)
+ }
+ }
+--- request
+GET /t
+--- timeout: 30
+--- response_body eval
+qr/baseline_unhealthy=\d+, healthy_total=\d+, healthy_moved=0/
+--- no_error_log
+[error]
diff --git a/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t
b/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t
new file mode 100644
index 000000000..147fda63b
--- /dev/null
+++ b/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t
@@ -0,0 +1,252 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_long_string();
+no_shuffle();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+ my $user_yaml_config = <<_EOC_;
+plugins:
+ - ai-proxy-multi
+_EOC_
+ $block->set_value("extra_yaml_config", $user_yaml_config);
+
+ my $http_config = <<_EOC_;
+ server {
+ server_name gpu-a;
+ listen 127.0.0.1:16724;
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+
ngx.say([[{"choices":[{"message":{"content":"gpu-a","role":"assistant"}}]}]])
+ }
+ }
+
+ location /status {
+ content_by_lua_block {
+ ngx.status = 500
+ ngx.say("fail")
+ }
+ }
+ }
+
+ server {
+ server_name gpu-b;
+ listen 127.0.0.1:16725;
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+
ngx.say([[{"choices":[{"message":{"content":"gpu-b","role":"assistant"}}]}]])
+ }
+ }
+
+ location /status {
+ content_by_lua_block {
+ ngx.say("ok")
+ }
+ }
+ }
+
+ server {
+ server_name gpu-c;
+ listen 127.0.0.1:16726;
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+
ngx.say([[{"choices":[{"message":{"content":"gpu-c","role":"assistant"}}]}]])
+ }
+ }
+
+ location /status {
+ content_by_lua_block {
+ ngx.say("ok")
+ }
+ }
+ }
+_EOC_
+ $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: ai-proxy-multi chash keeps healthy instance mapping stable when
health status changes
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local json = require("cjson.safe")
+ local http = require("resty.http")
+
+ local checks = {
+ active = {
+ type = "http",
+ timeout = 1,
+ http_path = "/status",
+ healthy = {
+ interval = 1,
+ http_statuses = {200},
+ successes = 1,
+ },
+ unhealthy = {
+ interval = 1,
+ http_statuses = {500},
+ http_failures = 1,
+ tcp_failures = 1,
+ timeouts = 1,
+ },
+ },
+ }
+
+ local function instance(name, port, weight, with_checks)
+ local ins = {
+ name = name,
+ provider = "openai-compatible",
+ weight = weight,
+ auth = {
+ header = {
+ Authorization = "Bearer token",
+ },
+ },
+ options = {
+ model = name,
+ },
+ override = {
+ endpoint = "http://127.0.0.1:" .. port ..
"/v1/chat/completions",
+ },
+ }
+ if with_checks then
+ ins.checks = checks
+ end
+ return ins
+ end
+
+ local function put_route(with_checks)
+ local route = {
+ uri = "/ai",
+ plugins = {
+ ["ai-proxy-multi"] = {
+ balancer = {
+ algorithm = "chash",
+ hash_on = "header",
+ key = "X-Sessionid",
+ },
+ instances = {
+ instance("gpu-a", 16724, 3, with_checks),
+ instance("gpu-b", 16725, 6, with_checks),
+ instance("gpu-c", 16726, 10, with_checks),
+ },
+ ssl_verify = false,
+ },
+ },
+ }
+
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ json.encode(route)
+ )
+ assert(code < 300, body)
+ end
+
+ local function send(session_id)
+ local httpc = http.new()
+ local res, err = httpc:request_uri(
+ "http://127.0.0.1:" .. ngx.var.server_port .. "/ai",
+ {
+ method = "POST",
+ body = json.encode({messages = {{role = "user",
content = "hi"}}}),
+ keepalive = false,
+ headers = {
+ ["Content-Type"] = "application/json",
+ ["X-Sessionid"] = session_id,
+ },
+ }
+ )
+ assert(res, err)
+ assert(res.status == 200, res.status .. ": " .. res.body)
+ local body = assert(json.decode(res.body))
+ return body.choices[1].message.content
+ end
+
+ put_route(false)
+
+ local baseline = {}
+ local baseline_a = 0
+ local healthy_total = 0
+ local total = 120
+ for i = 1, total do
+ local key = "session-" .. i
+ local name = send(key)
+ baseline[key] = name
+ if name == "gpu-a" then
+ baseline_a = baseline_a + 1
+ else
+ healthy_total = healthy_total + 1
+ end
+ end
+ assert(baseline_a > 0, "baseline did not hit gpu-a")
+ assert(healthy_total > 0, "baseline did not hit healthy instances")
+
+ put_route(true)
+ send("warmup")
+ ngx.sleep(3)
+
+ local healthy_moved = 0
+ local unhealthy_stayed = 0
+ for i = 1, total do
+ local key = "session-" .. i
+ local before = baseline[key]
+ local after = send(key)
+ if before == "gpu-a" then
+ if after == "gpu-a" then
+ unhealthy_stayed = unhealthy_stayed + 1
+ end
+ elseif after ~= before then
+ healthy_moved = healthy_moved + 1
+ end
+ end
+
+ assert(healthy_moved == 0,
+ "healthy-instance keys moved after health change: " ..
healthy_moved)
+ assert(unhealthy_stayed == 0,
+ "unhealthy-instance keys still routed to gpu-a: " ..
unhealthy_stayed)
+
+ ngx.say("baseline_a=", baseline_a,
+ ", healthy_total=", healthy_total,
+ ", healthy_moved=", healthy_moved)
+ }
+ }
+--- timeout: 30
+--- response_body eval
+qr/baseline_a=\d+, healthy_total=\d+, healthy_moved=0/
+--- no_error_log
+[error]