This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 9c899b876 feat: stream subsystem support kubernetes service discovery
(#8640)
9c899b876 is described below
commit 9c899b87648039e2c6c73b087d627aea2df90f55
Author: Ashing Zheng <[email protected]>
AuthorDate: Sat Jan 28 08:58:39 2023 +0800
feat: stream subsystem support kubernetes service discovery (#8640)
Fixes https://github.com/apache/apisix/issues/7779
---
apisix/cli/ngx_tpl.lua | 7 +
apisix/discovery/kubernetes/init.lua | 29 ++-
docs/en/latest/discovery/kubernetes.md | 6 +
docs/zh/latest/discovery/kubernetes.md | 6 +
t/APISIX.pm | 4 +
t/kubernetes/discovery/stream/kubernetes.t | 348 +++++++++++++++++++++++++++++
6 files changed, 396 insertions(+), 4 deletions(-)
diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 95ac3b763..7083d4201 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -145,6 +145,13 @@ stream {
lua_shared_dict plugin-limit-conn-stream {*
stream.lua_shared_dict["plugin-limit-conn-stream"] *};
{% end %}
+ # for discovery shared dict
+ {% if discovery_shared_dicts then %}
+ {% for key, size in pairs(discovery_shared_dicts) do %}
+ lua_shared_dict {*key*}-stream {*size*};
+ {% end %}
+ {% end %}
+
resolver {% for _, dns_addr in ipairs(dns_resolver or {}) do %}
{*dns_addr*} {% end %} {% if dns_resolver_valid then %}
valid={*dns_resolver_valid*}{% end %} ipv6={% if enable_ipv6 then %}on{% else
%}off{% end %};
resolver_timeout {*resolver_timeout*};
diff --git a/apisix/discovery/kubernetes/init.lua
b/apisix/discovery/kubernetes/init.lua
index d7258a556..3f5f275d9 100644
--- a/apisix/discovery/kubernetes/init.lua
+++ b/apisix/discovery/kubernetes/init.lua
@@ -25,7 +25,8 @@ local os = os
local error = error
local pcall = pcall
local setmetatable = setmetatable
-local process = require("ngx.process")
+local is_http = ngx.config.subsystem == "http"
+local support_process, process = pcall(require, "ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local local_conf = require("apisix.core.config_local").local_conf()
@@ -331,9 +332,24 @@ local function start_fetch(handle)
ngx.timer.at(0, timer_runner)
end
+local function get_endpoint_dict(id)
+ local shm = "kubernetes"
+
+ if id and #id > 0 then
+ shm = shm .. "-" .. id
+ end
+
+ if not is_http then
+ shm = shm .. "-stream"
+ end
+
+ return ngx.shared[shm]
+end
+
local function single_mode_init(conf)
- local endpoint_dict = ngx.shared.kubernetes
+ local endpoint_dict = get_endpoint_dict()
+
if not endpoint_dict then
error("failed to get lua_shared_dict: ngx.shared.kubernetes, " ..
"please check your APISIX version")
@@ -407,7 +423,7 @@ local function multiple_mode_worker_init(confs)
error("duplicate id value")
end
- local endpoint_dict = ngx.shared["kubernetes-" .. id]
+ local endpoint_dict = get_endpoint_dict(id)
if not endpoint_dict then
error(string.format("failed to get lua_shared_dict:
ngx.shared.kubernetes-%s, ", id) ..
"please check your APISIX version")
@@ -433,7 +449,7 @@ local function multiple_mode_init(confs)
error("duplicate id value")
end
- local endpoint_dict = ngx.shared["kubernetes-" .. id]
+ local endpoint_dict = get_endpoint_dict(id)
if not endpoint_dict then
error(string.format("failed to get lua_shared_dict:
ngx.shared.kubernetes-%s, ", id) ..
"please check your APISIX version")
@@ -504,6 +520,11 @@ end
function _M.init_worker()
+ if not support_process then
+ core.log.error("kubernetes discovery not support in subsystem: ",
ngx.config.subsystem,
+ ", please check if your openresty version >= 1.19.9.1
or not")
+ return
+ end
local discovery_conf = local_conf.discovery.kubernetes
core.log.info("kubernetes discovery conf: ",
core.json.delay_encode(discovery_conf))
if #discovery_conf == 0 then
diff --git a/docs/en/latest/discovery/kubernetes.md
b/docs/en/latest/discovery/kubernetes.md
index 04e01f6ca..e80c73851 100644
--- a/docs/en/latest/discovery/kubernetes.md
+++ b/docs/en/latest/discovery/kubernetes.md
@@ -34,6 +34,12 @@ The [_Kubernetes_](https://kubernetes.io/) service discovery
[_List-Watch_](http
Discovery also provides a node query interface in accordance with the [_APISIX
Discovery
Specification_](https://github.com/apache/apisix/blob/master/docs/en/latest/discovery.md).
+:::note
+
+use kubernetes discovery in L4 require OpenResty version >= 1.19.9.1
+
+:::
+
## How To Use
Kubernetes service discovery both support single-cluster and multi-cluster
mode, applicable to the case where the service is distributed in a single or
multiple Kubernetes clusters.
diff --git a/docs/zh/latest/discovery/kubernetes.md
b/docs/zh/latest/discovery/kubernetes.md
index 173428820..5e6d3e040 100644
--- a/docs/zh/latest/discovery/kubernetes.md
+++ b/docs/zh/latest/discovery/kubernetes.md
@@ -34,6 +34,12 @@ Kubernetes 服务发现以
[_List-Watch_](https://kubernetes.io/docs/reference/u
同时遵循 [_APISIX Discovery
规范_](https://github.com/apache/apisix/blob/master/docs/zh/latest/discovery.md)
提供了节点查询接口。
+:::note
+
+在四层中使用 Kubernetes 服务发现要求 OpenResty 版本大于等于 1.19.9.1
+
+:::
+
## Kubernetes 服务发现的使用
目前 Kubernetes 服务发现支持单集群和多集群模式,分别适用于待发现的服务分布在单个或多个 Kubernetes 的场景。
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 7fa11dd10..8e9e72d11 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -393,6 +393,10 @@ _EOC_
lua_shared_dict etcd-cluster-health-check-stream 10m;
lua_shared_dict worker-events-stream 10m;
+ lua_shared_dict kubernetes-stream 1m;
+ lua_shared_dict kubernetes-first-stream 1m;
+ lua_shared_dict kubernetes-second-stream 1m;
+
upstream apisix_backend {
server 127.0.0.1:1900;
balancer_by_lua_block {
diff --git a/t/kubernetes/discovery/stream/kubernetes.t
b/t/kubernetes/discovery/stream/kubernetes.t
new file mode 100644
index 000000000..3df431fb6
--- /dev/null
+++ b/t/kubernetes/discovery/stream/kubernetes.t
@@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BEGIN {
+ our $token_file =
"/tmp/var/run/secrets/kubernetes.io/serviceaccount/token";
+ our $token_value = eval {`cat $token_file 2>/dev/null`};
+
+ our $yaml_config = <<_EOC_;
+apisix:
+ node_listen: 1984
+deployment:
+ role: data_plane
+ role_data_plane:
+ config_provider: yaml
+discovery:
+ kubernetes:
+ - id: first
+ service:
+ host: "127.0.0.1"
+ port: "6443"
+ client:
+ token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+ - id: second
+ service:
+ schema: "http",
+ host: "127.0.0.1",
+ port: "6445"
+ client:
+ token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+
+_EOC_
+
+}
+
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version =~ m/\/1.19.3/) {
+ plan(skip_all => "require OpenResty version >= 1.19.9.1");
+} else {
+ plan('no_plan');
+}
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+ $block->set_value("apisix_yaml", $apisix_yaml);
+
+ my $main_config = $block->main_config // <<_EOC_;
+env KUBERNETES_SERVICE_HOST=127.0.0.1;
+env KUBERNETES_SERVICE_PORT=6443;
+env KUBERNETES_CLIENT_TOKEN=$::token_value;
+env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
+_EOC_
+
+ $block->set_value("main_config", $main_config);
+
+ my $config = $block->config // <<_EOC_;
+ location /operators {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local core = require("apisix.core")
+ local ipairs = ipairs
+
+ ngx.req.read_body()
+ local request_body = ngx.req.get_body_data()
+ local operators = core.json.decode(request_body)
+
+ core.log.info("get body ", request_body)
+ core.log.info("get operators ", #operators)
+ for _, op in ipairs(operators) do
+ local method, path, body
+ local headers = {
+ ["Host"] = "127.0.0.1:6445"
+ }
+
+ if op.op == "replace_subsets" then
+ method = "PATCH"
+ path = "/api/v1/namespaces/" .. op.namespace ..
"/endpoints/" .. op.name
+ if #op.subsets == 0 then
+ body =
'[{"path":"/subsets","op":"replace","value":[]}]'
+ else
+ local t = { { op = "replace", path = "/subsets",
value = op.subsets } }
+ body = core.json.encode(t, true)
+ end
+ headers["Content-Type"] = "application/json-patch+json"
+ end
+
+ if op.op == "replace_labels" then
+ method = "PATCH"
+ path = "/api/v1/namespaces/" .. op.namespace ..
"/endpoints/" .. op.name
+ local t = { { op = "replace", path =
"/metadata/labels", value = op.labels } }
+ body = core.json.encode(t, true)
+ headers["Content-Type"] = "application/json-patch+json"
+ end
+
+ local httpc = http.new()
+ core.log.info("begin to connect ", "127.0.0.1:6445")
+ local ok, message = httpc:connect({
+ scheme = "http",
+ host = "127.0.0.1",
+ port = 6445,
+ })
+ if not ok then
+ core.log.error("connect 127.0.0.1:6445 failed, message
: ", message)
+ ngx.say("FAILED")
+ end
+ local res, err = httpc:request({
+ method = method,
+ path = path,
+ headers = headers,
+ body = body,
+ })
+ if err ~= nil then
+ core.log.err("operator k8s cluster error: ", err)
+ return 500
+ end
+ if res.status ~= 200 and res.status ~= 201 and res.status
~= 409 then
+ return res.status
+ end
+ end
+ ngx.say("DONE")
+ }
+ }
+
+_EOC_
+
+ $block->set_value("config", $config);
+
+ my $stream_config = $block->stream_config // <<_EOC_;
+ server {
+ listen 8125;
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local d = require("apisix.discovery.kubernetes")
+
+ ngx.sleep(1)
+
+ local sock = ngx.req.socket()
+ local request_body = sock:receive()
+
+ core.log.info("get body ", request_body)
+
+ local response_body = "{"
+ local queries = core.json.decode(request_body)
+ for _,query in ipairs(queries) do
+ local nodes = d.nodes(query)
+ if nodes==nil or #nodes==0 then
+ response_body=response_body.." "..0
+ else
+ response_body=response_body.." "..#nodes
+ end
+ end
+ ngx.say(response_body.." }")
+ }
+ }
+
+_EOC_
+
+ $block->set_value("extra_stream_config", $stream_config);
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create namespace and endpoints
+--- yaml_config eval: $::yaml_config
+--- request
+POST /operators
+[
+ {
+ "op": "replace_subsets",
+ "namespace": "ns-a",
+ "name": "ep",
+ "subsets": [
+ {
+ "addresses": [
+ {
+ "ip": "10.0.0.1"
+ },
+ {
+ "ip": "10.0.0.2"
+ }
+ ],
+ "ports": [
+ {
+ "name": "p1",
+ "port": 5001
+ }
+ ]
+ },
+ {
+ "addresses": [
+ {
+ "ip": "20.0.0.1"
+ },
+ {
+ "ip": "20.0.0.2"
+ }
+ ],
+ "ports": [
+ {
+ "name": "p2",
+ "port": 5002
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "op": "create_namespace",
+ "name": "ns-b"
+ },
+ {
+ "op": "replace_subsets",
+ "namespace": "ns-b",
+ "name": "ep",
+ "subsets": [
+ {
+ "addresses": [
+ {
+ "ip": "10.0.0.1"
+ },
+ {
+ "ip": "10.0.0.2"
+ }
+ ],
+ "ports": [
+ {
+ "name": "p1",
+ "port": 5001
+ }
+ ]
+ },
+ {
+ "addresses": [
+ {
+ "ip": "20.0.0.1"
+ },
+ {
+ "ip": "20.0.0.2"
+ }
+ ],
+ "ports": [
+ {
+ "name": "p2",
+ "port": 5002
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "op": "create_namespace",
+ "name": "ns-c"
+ },
+ {
+ "op": "replace_subsets",
+ "namespace": "ns-c",
+ "name": "ep",
+ "subsets": [
+ {
+ "addresses": [
+ {
+ "ip": "10.0.0.1"
+ },
+ {
+ "ip": "10.0.0.2"
+ }
+ ],
+ "ports": [
+ {
+ "port": 5001
+ }
+ ]
+ },
+ {
+ "addresses": [
+ {
+ "ip": "20.0.0.1"
+ },
+ {
+ "ip": "20.0.0.2"
+ }
+ ],
+ "ports": [
+ {
+ "port": 5002
+ }
+ ]
+ }
+ ]
+ }
+]
+--- more_headers
+Content-type: application/json
+
+
+
+=== TEST 2: use default parameters
+--- yaml_config eval: $::yaml_config
+--- apisix_yaml
+stream_routes:
+ -
+ id: 1
+ server_port: 1985
+ upstream_id: 1
+
+upstreams:
+ - nodes:
+ "127.0.0.1:8125": 1
+ type: roundrobin
+ id: 1
+
+#END
+--- stream_request
+["first/ns-a/ep:p1","first/ns-a/ep:p2","first/ns-b/ep:p1","second/ns-a/ep:p1","second/ns-a/ep:p2","second/ns-b/ep:p1"]
+--- stream_response eval
+qr{ 2 2 2 2 2 2 }