This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c899b876 feat: stream subsystem support kubernetes service discovery 
(#8640)
9c899b876 is described below

commit 9c899b87648039e2c6c73b087d627aea2df90f55
Author: Ashing Zheng <[email protected]>
AuthorDate: Sat Jan 28 08:58:39 2023 +0800

    feat: stream subsystem support kubernetes service discovery (#8640)
    
    Fixes https://github.com/apache/apisix/issues/7779
---
 apisix/cli/ngx_tpl.lua                     |   7 +
 apisix/discovery/kubernetes/init.lua       |  29 ++-
 docs/en/latest/discovery/kubernetes.md     |   6 +
 docs/zh/latest/discovery/kubernetes.md     |   6 +
 t/APISIX.pm                                |   4 +
 t/kubernetes/discovery/stream/kubernetes.t | 348 +++++++++++++++++++++++++++++
 6 files changed, 396 insertions(+), 4 deletions(-)

diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 95ac3b763..7083d4201 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -145,6 +145,13 @@ stream {
     lua_shared_dict plugin-limit-conn-stream {* 
stream.lua_shared_dict["plugin-limit-conn-stream"] *};
     {% end %}
 
+    # for discovery shared dict
+    {% if discovery_shared_dicts then %}
+    {% for key, size in pairs(discovery_shared_dicts) do %}
+    lua_shared_dict {*key*}-stream {*size*};
+    {% end %}
+    {% end %}
+
     resolver {% for _, dns_addr in ipairs(dns_resolver or {}) do %} 
{*dns_addr*} {% end %} {% if dns_resolver_valid then %} 
valid={*dns_resolver_valid*}{% end %} ipv6={% if enable_ipv6 then %}on{% else 
%}off{% end %};
     resolver_timeout {*resolver_timeout*};
 
diff --git a/apisix/discovery/kubernetes/init.lua 
b/apisix/discovery/kubernetes/init.lua
index d7258a556..3f5f275d9 100644
--- a/apisix/discovery/kubernetes/init.lua
+++ b/apisix/discovery/kubernetes/init.lua
@@ -25,7 +25,8 @@ local os = os
 local error = error
 local pcall = pcall
 local setmetatable = setmetatable
-local process = require("ngx.process")
+local is_http = ngx.config.subsystem == "http"
+local support_process, process = pcall(require, "ngx.process")
 local core = require("apisix.core")
 local util = require("apisix.cli.util")
 local local_conf = require("apisix.core.config_local").local_conf()
@@ -331,9 +332,24 @@ local function start_fetch(handle)
     ngx.timer.at(0, timer_runner)
 end
 
+local function get_endpoint_dict(id)
+    local shm = "kubernetes"
+
+    if id and #id > 0 then
+        shm = shm .. "-" .. id
+    end
+
+    if not is_http then
+        shm = shm .. "-stream"
+    end
+
+    return ngx.shared[shm]
+end
+
 
 local function single_mode_init(conf)
-    local endpoint_dict = ngx.shared.kubernetes
+    local endpoint_dict = get_endpoint_dict()
+
     if not endpoint_dict then
         error("failed to get lua_shared_dict: ngx.shared.kubernetes, " ..
                 "please check your APISIX version")
@@ -407,7 +423,7 @@ local function multiple_mode_worker_init(confs)
             error("duplicate id value")
         end
 
-        local endpoint_dict = ngx.shared["kubernetes-" .. id]
+        local endpoint_dict = get_endpoint_dict(id)
         if not endpoint_dict then
             error(string.format("failed to get lua_shared_dict: 
ngx.shared.kubernetes-%s, ", id) ..
                     "please check your APISIX version")
@@ -433,7 +449,7 @@ local function multiple_mode_init(confs)
             error("duplicate id value")
         end
 
-        local endpoint_dict = ngx.shared["kubernetes-" .. id]
+        local endpoint_dict = get_endpoint_dict(id)
         if not endpoint_dict then
             error(string.format("failed to get lua_shared_dict: 
ngx.shared.kubernetes-%s, ", id) ..
                     "please check your APISIX version")
@@ -504,6 +520,11 @@ end
 
 
 function _M.init_worker()
+    if not support_process then
+        core.log.error("kubernetes discovery not support in subsystem: ", 
ngx.config.subsystem,
+                       ", please check if your openresty version >= 1.19.9.1 
or not")
+        return
+    end
     local discovery_conf = local_conf.discovery.kubernetes
     core.log.info("kubernetes discovery conf: ", 
core.json.delay_encode(discovery_conf))
     if #discovery_conf == 0 then
diff --git a/docs/en/latest/discovery/kubernetes.md 
b/docs/en/latest/discovery/kubernetes.md
index 04e01f6ca..e80c73851 100644
--- a/docs/en/latest/discovery/kubernetes.md
+++ b/docs/en/latest/discovery/kubernetes.md
@@ -34,6 +34,12 @@ The [_Kubernetes_](https://kubernetes.io/) service discovery 
[_List-Watch_](http
 
 Discovery also provides a node query interface in accordance with the [_APISIX 
Discovery 
Specification_](https://github.com/apache/apisix/blob/master/docs/en/latest/discovery.md).
 
+:::note
+
+use kubernetes discovery in L4 require OpenResty version >= 1.19.9.1
+
+:::
+
 ## How To Use
 
 Kubernetes service discovery both support single-cluster and multi-cluster 
mode, applicable to the case where the service is distributed in a single or 
multiple Kubernetes clusters.
diff --git a/docs/zh/latest/discovery/kubernetes.md 
b/docs/zh/latest/discovery/kubernetes.md
index 173428820..5e6d3e040 100644
--- a/docs/zh/latest/discovery/kubernetes.md
+++ b/docs/zh/latest/discovery/kubernetes.md
@@ -34,6 +34,12 @@ Kubernetes 服务发现以 
[_List-Watch_](https://kubernetes.io/docs/reference/u
 
 同时遵循 [_APISIX Discovery 
规范_](https://github.com/apache/apisix/blob/master/docs/zh/latest/discovery.md) 
提供了节点查询接口。
 
+:::note
+
+在四层中使用 Kubernetes 服务发现要求 OpenResty 版本大于等于 1.19.9.1
+
+:::
+
 ## Kubernetes 服务发现的使用
 
 目前 Kubernetes 服务发现支持单集群和多集群模式,分别适用于待发现的服务分布在单个或多个 Kubernetes 的场景。
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 7fa11dd10..8e9e72d11 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -393,6 +393,10 @@ _EOC_
     lua_shared_dict etcd-cluster-health-check-stream 10m;
     lua_shared_dict worker-events-stream 10m;
 
+    lua_shared_dict kubernetes-stream 1m;
+    lua_shared_dict kubernetes-first-stream 1m;
+    lua_shared_dict kubernetes-second-stream 1m;
+
     upstream apisix_backend {
         server 127.0.0.1:1900;
         balancer_by_lua_block {
diff --git a/t/kubernetes/discovery/stream/kubernetes.t 
b/t/kubernetes/discovery/stream/kubernetes.t
new file mode 100644
index 000000000..3df431fb6
--- /dev/null
+++ b/t/kubernetes/discovery/stream/kubernetes.t
@@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BEGIN {
+    our $token_file = 
"/tmp/var/run/secrets/kubernetes.io/serviceaccount/token";
+    our $token_value = eval {`cat $token_file 2>/dev/null`};
+
+    our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  kubernetes:
+    - id: first
+      service:
+        host: "127.0.0.1"
+        port: "6443"
+      client:
+        token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+    - id: second
+      service:
+        schema: "http",
+        host: "127.0.0.1",
+        port: "6445"
+      client:
+        token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+
+_EOC_
+
+}
+
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version =~ m/\/1.19.3/) {
+    plan(skip_all => "require OpenResty version >= 1.19.9.1");
+} else {
+    plan('no_plan');
+}
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+    $block->set_value("apisix_yaml", $apisix_yaml);
+
+    my $main_config = $block->main_config // <<_EOC_;
+env KUBERNETES_SERVICE_HOST=127.0.0.1;
+env KUBERNETES_SERVICE_PORT=6443;
+env KUBERNETES_CLIENT_TOKEN=$::token_value;
+env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
+_EOC_
+
+    $block->set_value("main_config", $main_config);
+
+    my $config = $block->config // <<_EOC_;
+        location /operators {
+            content_by_lua_block {
+                local http = require("resty.http")
+                local core = require("apisix.core")
+                local ipairs = ipairs
+
+                ngx.req.read_body()
+                local request_body = ngx.req.get_body_data()
+                local operators = core.json.decode(request_body)
+
+                core.log.info("get body ", request_body)
+                core.log.info("get operators ", #operators)
+                for _, op in ipairs(operators) do
+                    local method, path, body
+                    local headers = {
+                        ["Host"] = "127.0.0.1:6445"
+                    }
+
+                    if op.op == "replace_subsets" then
+                        method = "PATCH"
+                        path = "/api/v1/namespaces/" .. op.namespace .. 
"/endpoints/" .. op.name
+                        if #op.subsets == 0 then
+                            body = 
'[{"path":"/subsets","op":"replace","value":[]}]'
+                        else
+                            local t = { { op = "replace", path = "/subsets", 
value = op.subsets } }
+                            body = core.json.encode(t, true)
+                        end
+                        headers["Content-Type"] = "application/json-patch+json"
+                    end
+
+                    if op.op == "replace_labels" then
+                        method = "PATCH"
+                        path = "/api/v1/namespaces/" .. op.namespace .. 
"/endpoints/" .. op.name
+                        local t = { { op = "replace", path = 
"/metadata/labels", value = op.labels } }
+                        body = core.json.encode(t, true)
+                        headers["Content-Type"] = "application/json-patch+json"
+                    end
+
+                    local httpc = http.new()
+                    core.log.info("begin to connect ", "127.0.0.1:6445")
+                    local ok, message = httpc:connect({
+                        scheme = "http",
+                        host = "127.0.0.1",
+                        port = 6445,
+                    })
+                    if not ok then
+                        core.log.error("connect 127.0.0.1:6445 failed, message 
: ", message)
+                        ngx.say("FAILED")
+                    end
+                    local res, err = httpc:request({
+                        method = method,
+                        path = path,
+                        headers = headers,
+                        body = body,
+                    })
+                    if err ~= nil then
+                        core.log.err("operator k8s cluster error: ", err)
+                        return 500
+                    end
+                    if res.status ~= 200 and res.status ~= 201 and res.status 
~= 409 then
+                        return res.status
+                    end
+                end
+                ngx.say("DONE")
+            }
+        }
+
+_EOC_
+
+    $block->set_value("config", $config);
+
+    my $stream_config = $block->stream_config // <<_EOC_;
+        server {
+            listen 8125;
+            content_by_lua_block {
+                local core = require("apisix.core")
+                local d = require("apisix.discovery.kubernetes")
+
+                ngx.sleep(1)
+
+                local sock = ngx.req.socket()
+                local request_body = sock:receive()
+
+                core.log.info("get body ", request_body)
+
+                local response_body = "{"
+                local queries = core.json.decode(request_body)
+                for _,query in ipairs(queries) do
+                  local nodes = d.nodes(query)
+                  if nodes==nil or #nodes==0 then
+                      response_body=response_body.." "..0
+                  else
+                      response_body=response_body.." "..#nodes
+                  end
+                end
+                ngx.say(response_body.." }")
+            }
+        }
+
+_EOC_
+
+  $block->set_value("extra_stream_config", $stream_config);
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create namespace and endpoints
+--- yaml_config eval: $::yaml_config
+--- request
+POST /operators
+[
+  {
+    "op": "replace_subsets",
+    "namespace": "ns-a",
+    "name": "ep",
+    "subsets": [
+      {
+        "addresses": [
+          {
+            "ip": "10.0.0.1"
+          },
+          {
+            "ip": "10.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p1",
+            "port": 5001
+          }
+        ]
+      },
+      {
+        "addresses": [
+          {
+            "ip": "20.0.0.1"
+          },
+          {
+            "ip": "20.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p2",
+            "port": 5002
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "op": "create_namespace",
+    "name": "ns-b"
+  },
+  {
+    "op": "replace_subsets",
+    "namespace": "ns-b",
+    "name": "ep",
+    "subsets": [
+      {
+        "addresses": [
+          {
+            "ip": "10.0.0.1"
+          },
+          {
+            "ip": "10.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p1",
+            "port": 5001
+          }
+        ]
+      },
+      {
+        "addresses": [
+          {
+            "ip": "20.0.0.1"
+          },
+          {
+            "ip": "20.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p2",
+            "port": 5002
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "op": "create_namespace",
+    "name": "ns-c"
+  },
+  {
+    "op": "replace_subsets",
+    "namespace": "ns-c",
+    "name": "ep",
+    "subsets": [
+      {
+        "addresses": [
+          {
+            "ip": "10.0.0.1"
+          },
+          {
+            "ip": "10.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "port": 5001
+          }
+        ]
+      },
+      {
+        "addresses": [
+          {
+            "ip": "20.0.0.1"
+          },
+          {
+            "ip": "20.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "port": 5002
+          }
+        ]
+      }
+    ]
+  }
+]
+--- more_headers
+Content-type: application/json
+
+
+
+=== TEST 2: use default parameters
+--- yaml_config eval: $::yaml_config
+--- apisix_yaml
+stream_routes:
+  -
+    id: 1
+    server_port: 1985
+    upstream_id: 1
+
+upstreams:
+  - nodes:
+      "127.0.0.1:8125": 1
+    type: roundrobin
+    id: 1
+
+#END
+--- stream_request
+["first/ns-a/ep:p1","first/ns-a/ep:p2","first/ns-b/ep:p1","second/ns-a/ep:p1","second/ns-a/ep:p2","second/ns-b/ep:p1"]
+--- stream_response eval
+qr{ 2 2 2 2 2 2 }

Reply via email to