This is an automated email from the ASF dual-hosted git repository.

baoyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2500db7a4 feat: allow fetching stream healthcheck data through control 
api (#12996)
2500db7a4 is described below

commit 2500db7a4ae0c047606e8233821f92d318fb589e
Author: Shreemaan Abhishek <[email protected]>
AuthorDate: Fri Feb 13 13:00:45 2026 +0545

    feat: allow fetching stream healthcheck data through control api (#12996)
---
 apisix/cli/config.lua                   |   3 +-
 apisix/cli/ngx_tpl.lua                  |   6 +-
 apisix/control/v1.lua                   |   5 ++
 apisix/healthcheck_manager.lua          |   4 -
 apisix/router.lua                       |   8 ++
 conf/config.yaml.example                |   3 +-
 t/APISIX.pm                             |   3 +-
 t/admin/stream-routes.t                 |   3 +
 t/cli/test_main.sh                      |   4 +-
 t/stream-node/control-api-healthcheck.t | 151 ++++++++++++++++++++++++++++++++
 10 files changed, 174 insertions(+), 16 deletions(-)

diff --git a/apisix/cli/config.lua b/apisix/cli/config.lua
index ee6a68825..956eef30c 100644
--- a/apisix/cli/config.lua
+++ b/apisix/cli/config.lua
@@ -105,6 +105,7 @@ local _M = {
         ["prometheus-cache"] = "10m",
         ["standalone-config"] = "10m",
         ["status-report"] = "1m",
+        ["upstream-healthcheck"] = "10m",
       }
     },
     stream = {
@@ -120,7 +121,6 @@ local _M = {
         ["plugin-limit-conn-stream"] = "10m",
         ["worker-events-stream"] = "10m",
         ["tars-stream"] = "1m",
-        ["upstream-healthcheck-stream"] = "10m",
       }
     },
     main_configuration_snippet = "",
@@ -162,7 +162,6 @@ local _M = {
         ["plugin-limit-count"] = "10m",
         ["prometheus-metrics"] = "10m",
         ["plugin-limit-conn"] = "10m",
-        ["upstream-healthcheck"] = "10m",
         ["worker-events"] = "10m",
         ["lrucache-lock"] = "10m",
         ["balancer-ewma"] = "10m",
diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 15221e393..049282ad0 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -78,6 +78,7 @@ lua {
     lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
     {% end %}
     lua_shared_dict nacos 10m;
+    lua_shared_dict upstream-healthcheck {* 
meta.lua_shared_dict["upstream-healthcheck"] *};
 }
 
 {% if enabled_stream_plugins["prometheus"] and not enable_http then %}
@@ -149,10 +150,6 @@ stream {
     lua_shared_dict etcd-cluster-health-check-stream {* 
stream.lua_shared_dict["etcd-cluster-health-check-stream"] *};
     lua_shared_dict worker-events-stream {* 
stream.lua_shared_dict["worker-events-stream"] *};
 
-    {% if stream.lua_shared_dict["upstream-healthcheck-stream"] then %}
-    lua_shared_dict upstream-healthcheck-stream {* 
stream.lua_shared_dict["upstream-healthcheck-stream"] *};
-    {% end %}
-
     {% if enabled_discoveries["tars"] then %}
     lua_shared_dict tars-stream {* stream.lua_shared_dict["tars-stream"] *};
     {% end %}
@@ -283,7 +280,6 @@ http {
     {% end %}
 
     lua_shared_dict internal-status {* http.lua_shared_dict["internal-status"] 
*};
-    lua_shared_dict upstream-healthcheck {* 
http.lua_shared_dict["upstream-healthcheck"] *};
     lua_shared_dict worker-events {* http.lua_shared_dict["worker-events"] *};
     lua_shared_dict lrucache-lock {* http.lua_shared_dict["lrucache-lock"] *};
     lua_shared_dict balancer-ewma {* http.lua_shared_dict["balancer-ewma"] *};
diff --git a/apisix/control/v1.lua b/apisix/control/v1.lua
index f457eac0d..496b8b57d 100644
--- a/apisix/control/v1.lua
+++ b/apisix/control/v1.lua
@@ -18,6 +18,7 @@ local require = require
 local core = require("apisix.core")
 local plugin = require("apisix.plugin")
 local get_routes = require("apisix.router").http_routes
+local get_stream_routes = require("apisix.router").stream_routes
 local get_services = require("apisix.http.service").services
 local upstream_mod = require("apisix.upstream")
 local healthcheck_manager = require("apisix.healthcheck_manager")
@@ -179,6 +180,8 @@ local function _get_health_checkers()
     local infos = {}
     local routes = get_routes()
     iter_and_add_healthcheck_info(infos, routes)
+    local stream_routes = get_stream_routes()
+    iter_and_add_healthcheck_info(infos, stream_routes)
     local services = get_services()
     iter_and_add_healthcheck_info(infos, services)
     local upstreams = get_upstreams()
@@ -240,6 +243,8 @@ function _M.get_health_checker()
         values = get_services()
     elseif src_type == "upstreams" then
         values = get_upstreams()
+    elseif src_type == "stream_routes" then
+        values = get_stream_routes()
     else
         return 400, {error_msg = str_format("invalid src type %s", src_type)}
     end
diff --git a/apisix/healthcheck_manager.lua b/apisix/healthcheck_manager.lua
index bc27bdb8e..8133364ee 100644
--- a/apisix/healthcheck_manager.lua
+++ b/apisix/healthcheck_manager.lua
@@ -36,10 +36,6 @@ local waiting_pool = {}      -- resource_path -> resource_ver
 
 local DELAYED_CLEAR_TIMEOUT = 10
 local healthcheck_shdict_name = "upstream-healthcheck"
-local is_http = ngx.config.subsystem == "http"
-if not is_http then
-    healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. 
ngx.config.subsystem
-end
 
 
 local function get_healthchecker_name(value)
diff --git a/apisix/router.lua b/apisix/router.lua
index 82270d1dd..6cdd07175 100644
--- a/apisix/router.lua
+++ b/apisix/router.lua
@@ -87,6 +87,14 @@ function _M.http_init_worker()
     router_ssl.init_worker()
     _M.router_ssl = router_ssl
 
+    -- Initialize stream router in HTTP workers only if stream mode is enabled
+    -- This allows the Control API (which runs in HTTP workers) to access 
stream routes
+    if conf and conf.apisix and conf.apisix.stream_proxy then
+        local router_stream = require("apisix.stream.router.ip_port")
+        router_stream.stream_init_worker(filter)
+        _M.router_stream = router_stream
+    end
+
     _M.api = require("apisix.api_router")
 end
 
diff --git a/conf/config.yaml.example b/conf/config.yaml.example
index 4d78ae5af..a32499710 100644
--- a/conf/config.yaml.example
+++ b/conf/config.yaml.example
@@ -189,6 +189,7 @@ nginx_config:                     # Config for render the 
template to generate n
                                   # Please resize when the `error.log` prompts 
that the data is full.
                                   # NOTE: Restart APISIX to take effect.
       standalone-config: 10m
+      upstream-healthcheck: 10m
 
   stream:
     enable_access_log: false                 # Enable stream proxy access 
logging.
@@ -202,7 +203,6 @@ nginx_config:                     # Config for render the 
template to generate n
       plugin-limit-conn-stream: 10m
       worker-events-stream: 10m
       tars-stream: 1m
-      upstream-healthcheck-stream: 10m
 
   # Add other custom Nginx configurations.
   # Users are responsible for validating the custom configurations
@@ -283,7 +283,6 @@ nginx_config:                     # Config for render the 
template to generate n
       plugin-limit-count: 10m
       prometheus-metrics: 10m     # In production, less than 50m is recommended
       plugin-limit-conn: 10m
-      upstream-healthcheck: 10m
       worker-events: 10m
       lrucache-lock: 10m
       balancer-ewma: 10m
diff --git a/t/APISIX.pm b/t/APISIX.pm
index a99dc30e8..5aa54a1af 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -294,6 +294,7 @@ lua {
     lua_shared_dict standalone-config 10m;
     lua_shared_dict status-report 1m;
     lua_shared_dict nacos 10m;
+    lua_shared_dict upstream-healthcheck 10m;
 }
 _EOC_
     }
@@ -416,7 +417,6 @@ _EOC_
     lua_shared_dict plugin-limit-conn-stream 10m;
     lua_shared_dict etcd-cluster-health-check-stream 10m;
     lua_shared_dict worker-events-stream 10m;
-    lua_shared_dict upstream-healthcheck-stream 10m;
 
     lua_shared_dict kubernetes-stream 1m;
     lua_shared_dict kubernetes-first-stream 1m;
@@ -592,7 +592,6 @@ _EOC_
     lua_shared_dict plugin-ai-rate-limiting 10m;
     lua_shared_dict plugin-ai-rate-limiting-reset-header 10m;
     lua_shared_dict internal-status 10m;
-    lua_shared_dict upstream-healthcheck 32m;
     lua_shared_dict worker-events 10m;
     lua_shared_dict lrucache-lock 10m;
     lua_shared_dict balancer-ewma 1m;
diff --git a/t/admin/stream-routes.t b/t/admin/stream-routes.t
index cc3f27ecb..7ce694ccd 100644
--- a/t/admin/stream-routes.t
+++ b/t/admin/stream-routes.t
@@ -599,6 +599,9 @@ xrpc:
                     ngx.say(body)
                 end
             end
+
+            -- Clean up the stream route to avoid interfering with subsequent 
tests
+            local code, body = t('/apisix/admin/stream_routes/1', 
ngx.HTTP_DELETE)
         }
     }
 --- request
diff --git a/t/cli/test_main.sh b/t/cli/test_main.sh
index 62c128c94..c4c3539c3 100755
--- a/t/cli/test_main.sh
+++ b/t/cli/test_main.sh
@@ -884,6 +884,9 @@ git checkout conf/config.yaml
 
 echo '
 nginx_config:
+  meta:
+    lua_shared_dict:
+      upstream-healthcheck: 20m
   http:
     lua_shared_dict:
       internal-status: 20m
@@ -891,7 +894,6 @@ nginx_config:
       plugin-limit-count: 20m
       prometheus-metrics: 20m
       plugin-limit-conn: 20m
-      upstream-healthcheck: 20m
       worker-events: 20m
       lrucache-lock: 20m
       balancer-ewma: 20m
diff --git a/t/stream-node/control-api-healthcheck.t 
b/t/stream-node/control-api-healthcheck.t
new file mode 100644
index 000000000..b67c5fb42
--- /dev/null
+++ b/t/stream-node/control-api-healthcheck.t
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('info');
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->error_log && !$block->no_error_log) {
+        $block->set_value("no_error_log", "[error]\n[alert]");
+    }
+
+    my $config = ($block->config // "") . <<_EOC_;
+    location /hit {
+        content_by_lua_block {
+
+            local sock = ngx.socket.tcp()
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.log(ngx.ERR, "failed to connect: ", err)
+                return ngx.exit(503)
+            end
+
+            local bytes, err = sock:send("mmm")
+            if not bytes then
+                ngx.log(ngx.ERR, "send stream request error: ", err)
+                return ngx.exit(503)
+            end
+
+            local data, err = sock:receive("*a")
+            if not data then
+                sock:close()
+                return ngx.exit(503)
+            end
+            ngx.print(data)
+        }
+    }
+
+_EOC_
+
+    $block->set_value("config", $config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: set stream route(id: 1)
+--- stream_enable
+--- config
+    location /test {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local dkjson = require("dkjson")
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "remote_addr": "127.0.0.1",
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1995": 1
+                        },
+                        "type": "roundrobin",
+                        "checks": {
+                            "active": {
+                                "timeout": 40,
+                                "type": "tcp",
+                                "unhealthy": {
+                                    "interval": 60,
+                                    "failures": 2
+                                },
+                                "healthy": {
+                                    "interval": 60,
+                                    "successes": 2
+                                },
+                                "concurrency": 2
+                            }
+                        },
+                        "retries": 3,
+                        "timeout": {
+                            "read": 40,
+                            "send": 40,
+                            "connect": 40
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                return
+            end
+
+            local stream = t("/hit", ngx.HTTP_GET)
+            if stream >= 300 then
+                ngx.status = stream
+                return
+            end
+
+            ngx.sleep(3)
+            local healthcheck, _, body = t("/v1/healthcheck", ngx.HTTP_GET)
+            if healthcheck >= 300 then
+                ngx.status = healthcheck
+                return
+            end
+
+            local healthcheck_data, err = core.json.decode(body)
+            if not healthcheck_data then
+                ngx.log(ngx.ERR, "failed to decode healthcheck data: ", err)
+                return ngx.exit(503)
+            end
+            ngx.say(dkjson.encode(healthcheck_data))
+
+            -- healthcheck of stream route
+            local healthcheck, _, body = t("/v1/healthcheck/stream_routes/1", 
ngx.HTTP_GET)
+            if healthcheck >= 300 then
+                ngx.status = healthcheck
+                return
+            end
+
+            local healthcheck_data, err = core.json.decode(body)
+            if not healthcheck_data then
+                ngx.log(ngx.ERR, "failed to decode healthcheck data: ", err)
+                return ngx.exit(503)
+            end
+            ngx.say(dkjson.encode(healthcheck_data))
+        }
+    }
+--- timeout: 5
+--- request
+GET /test
+--- response_body
+[{"name":"/apisix/stream_routes/1","nodes":[{"counter":{"http_failure":0,"success":0,"tcp_failure":0,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1995,"status":"healthy"}],"type":"tcp"}]
+{"name":"/apisix/stream_routes/1","nodes":[{"counter":{"http_failure":0,"success":0,"tcp_failure":0,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1995,"status":"healthy"}],"type":"tcp"}

Reply via email to