This is an automated email from the ASF dual-hosted git repository.
baoyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 46176f249 fix: correct handling of endpointSlices in Kubernetes
service discovery (#12634)
46176f249 is described below
commit 46176f249e1fbf599040b8861e0b701cfc8e3ef2
Author: aie <[email protected]>
AuthorDate: Wed Jan 28 14:12:01 2026 +0800
fix: correct handling of endpointSlices in Kubernetes service discovery
(#12634)
---
apisix/discovery/kubernetes/init.lua | 232 +++++++++----
t/kubernetes/configs/endpointslices.yaml | 12 +-
t/kubernetes/discovery/kubernetes3.t | 33 +-
t/kubernetes/discovery/kubernetes4.t | 546 +++++++++++++++++++++++++++++++
4 files changed, 742 insertions(+), 81 deletions(-)
diff --git a/apisix/discovery/kubernetes/init.lua
b/apisix/discovery/kubernetes/init.lua
index 1b85181b6..594aaf2a0 100644
--- a/apisix/discovery/kubernetes/init.lua
+++ b/apisix/discovery/kubernetes/init.lua
@@ -16,6 +16,7 @@
--
local ngx = ngx
+local unpack = unpack
local ipairs = ipairs
local pairs = pairs
local string = string
@@ -25,7 +26,6 @@ local os = os
local error = error
local pcall = pcall
local setmetatable = setmetatable
-local type = type
local is_http = ngx.config.subsystem == "http"
local process = require("ngx.process")
local core = require("apisix.core")
@@ -42,6 +42,7 @@ local endpoint_lrucache = core.lrucache.new({
})
local endpoint_buffer = {}
+local kubernetes_service_name_label = "kubernetes.io/service-name"
local function sort_nodes_cmp(left, right)
if left.host ~= right.host then
@@ -51,69 +52,133 @@ local function sort_nodes_cmp(left, right)
return left.port < right.port
end
-local function on_endpoint_slices_modified(handle, endpoint, operate)
- if handle.namespace_selector and
- not handle:namespace_selector(endpoint.metadata.namespace) then
- return
+local function update_endpoint_slices_cache(handle, endpoint_key, slice,
slice_name)
+ if not handle.endpoint_slices_cache[endpoint_key] then
+ handle.endpoint_slices_cache[endpoint_key] = {}
end
+ local endpoint_slices = handle.endpoint_slices_cache[endpoint_key]
+ endpoint_slices[slice_name] = slice
+end
- core.log.debug(core.json.delay_encode(endpoint))
- core.table.clear(endpoint_buffer)
-
- local endpointslices = endpoint.endpoints
- if type(endpointslices) == "table" then
- for _, endpointslice in ipairs(endpointslices) do
- if endpointslice.addresses then
- local addresses = endpointslice.addresses
- for _, port in ipairs(endpoint.ports or {}) do
- local port_name
- if port.name then
- port_name = port.name
- elseif port.targetPort then
- port_name = tostring(port.targetPort)
- else
- port_name = tostring(port.port)
- end
-
- if endpointslice.conditions and
endpointslice.conditions.ready then
- local nodes = endpoint_buffer[port_name]
- if nodes == nil then
- nodes = core.table.new(0, #endpointslices *
#addresses)
- endpoint_buffer[port_name] = nodes
- end
-
- for _, address in ipairs(addresses) do
- core.table.insert(nodes, {
- host = address.ip,
- port = port.port,
- weight = handle.default_weight
- })
- end
- end
- end
+local function get_endpoints_from_cache(handle, endpoint_key)
+ local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or {}
+ local endpoints = {}
+ for _, endpoint_slice in pairs(endpoint_slices) do
+ for port, targets in pairs(endpoint_slice) do
+ if not endpoints[port] then
+ endpoints[port] = core.table.new(0, #targets)
end
+ core.table.insert_tail(endpoints[port], unpack(targets))
end
end
- for _, ports in pairs(endpoint_buffer) do
- for _, nodes in pairs(ports) do
- core.table.sort(nodes, sort_nodes_cmp)
- end
- end
- local endpoint_key = endpoint.metadata.namespace .. "/" ..
endpoint.metadata.name
- local endpoint_content = core.json.encode(endpoint_buffer, true)
- local endpoint_version = ngx.crc32_long(endpoint_content)
+ return endpoints
+end
+local function update_endpoint_dict(handle, endpoints, endpoint_key)
+ local endpoint_content = core.json.encode(endpoints, true)
+ local endpoint_version = ngx.crc32_long(endpoint_content)
local _, err
_, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version",
endpoint_version)
if err then
- core.log.error("set endpoint version into discovery DICT failed, ",
err)
- return
+ return false, "set endpoint version into discovery DICT failed, " ..
err
end
_, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
if err then
- core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
+ return false, "set endpoint into discovery DICT failed, " .. err
+ end
+
+ return true
+end
+
+local function validate_endpoint_slice(endpoint_slice)
+ if not endpoint_slice.metadata then
+ return false, "endpoint_slice has no metadata, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.name then
+ return false, "endpoint_slice has no metadata.name, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.namespace then
+ return false, "endpoint_slice has no metadata.namespace,
endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.labels
+ or not
endpoint_slice.metadata.labels[kubernetes_service_name_label] then
+ return false, "endpoint_slice has no service-name, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+
+ return true
+end
+
+local function on_endpoint_slices_modified(handle, endpoint_slice, operate)
+ local ok, err = validate_endpoint_slice(endpoint_slice)
+ if not ok then
+ core.log.error("endpoint_slice validation fail: ", err)
+ return
+ end
+ if handle.namespace_selector and
+ not handle:namespace_selector(endpoint_slice.metadata.namespace)
then
+ return
+ end
+
+ core.log.debug("get endpoint_slice: ",
core.json.delay_encode(endpoint_slice))
+ --record nodes to every port in service
+ local port_to_nodes = {}
+
+ local slice_endpoints = endpoint_slice.endpoints
+ if not slice_endpoints or slice_endpoints == ngx.null then
+ slice_endpoints = {}
+ end
+
+ for _, endpoint in ipairs(slice_endpoints) do
+ if endpoint.addresses
+ and endpoint.conditions
+ and endpoint.conditions.ready then
+ local addresses = endpoint.addresses
+ for _, port in ipairs(endpoint_slice.ports or {}) do
+ local port_name
+ if port.name then
+ port_name = port.name
+ elseif port.targetPort then
+ port_name = tostring(port.targetPort)
+ else
+ port_name = tostring(port.port)
+ end
+
+ local nodes = port_to_nodes[port_name]
+ if nodes == nil then
+ nodes = core.table.new(0, #slice_endpoints * #addresses)
+ port_to_nodes[port_name] = nodes
+ end
+
+ for _, ip in ipairs(addresses) do
+ core.table.insert(nodes, {
+ host = ip,
+ port = port.port,
+ weight = handle.default_weight
+ })
+ end
+ end
+ end
+ end
+
+ local endpoint_key = endpoint_slice.metadata.namespace
+ .. "/" ..
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+ update_endpoint_slices_cache(handle, endpoint_key, port_to_nodes,
endpoint_slice.metadata.name)
+
+ local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+ for _, nodes in pairs(cached_endpoints) do
+ core.table.sort(nodes, sort_nodes_cmp)
+ end
+
+ local ok, err = update_endpoint_dict(handle, cached_endpoints,
endpoint_key)
+ if not ok then
+ core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
+ ", err: ", err)
return
end
if operate == "list" then
@@ -122,6 +187,36 @@ local function on_endpoint_slices_modified(handle,
endpoint, operate)
end
end
+local function on_endpoint_slices_deleted(handle, endpoint_slice)
+ local ok, err = validate_endpoint_slice(endpoint_slice)
+ if not ok then
+ core.log.error("endpoint_slice validation fail: ", err)
+ return
+ end
+
+ if handle.namespace_selector and
+ not handle:namespace_selector(endpoint_slice.metadata.namespace)
then
+ return
+ end
+
+ core.log.debug("delete endpoint_slice: ",
core.json.delay_encode(endpoint_slice))
+
+ local endpoint_key = endpoint_slice.metadata.namespace
+ .. "/" ..
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+ update_endpoint_slices_cache(handle, endpoint_key, nil,
endpoint_slice.metadata.name)
+
+ local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+ for _, nodes in pairs(cached_endpoints) do
+ core.table.sort(nodes, sort_nodes_cmp)
+ end
+
+ ok, err = update_endpoint_dict(handle, cached_endpoints, endpoint_key)
+ if not ok then
+ core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
+ ", err: ", err)
+ end
+end
+
local function on_endpoint_modified(handle, endpoint, operate)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
@@ -162,26 +257,16 @@ local function on_endpoint_modified(handle, endpoint,
operate)
end
end
- for _, ports in pairs(endpoint_buffer) do
- for _, nodes in pairs(ports) do
- core.table.sort(nodes, sort_nodes_cmp)
- end
+
+ for _, nodes in pairs(endpoint_buffer) do
+ core.table.sort(nodes, sort_nodes_cmp)
end
local endpoint_key = endpoint.metadata.namespace .. "/" ..
endpoint.metadata.name
- local endpoint_content = core.json.encode(endpoint_buffer, true)
- local endpoint_version = ngx.crc32_long(endpoint_content)
-
- local _, err
- _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version",
endpoint_version)
- if err then
- core.log.error("set endpoint version into discovery DICT failed, ",
err)
- return
- end
- _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
- if err then
- core.log.error("set endpoint into discovery DICT failed, ", err)
- handle.endpoint_dict:delete(endpoint_key .. "#version")
+ local ok, err = update_endpoint_dict(handle, endpoint_buffer, endpoint_key)
+ if not ok then
+ core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
+ ", err: ", err)
return
end
if operate == "list" then
@@ -207,6 +292,9 @@ end
local function pre_list(handle)
handle.current_keys_hash = {}
handle.existing_keys = handle.endpoint_dict:get_keys(0)
+ if handle.endpoint_slices_cache then
+ handle.endpoint_slices_cache = {}
+ end
end
@@ -499,11 +587,14 @@ local function single_mode_init(conf)
if conf.watch_endpoint_slices then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
+ endpoints_informer.on_deleted = on_endpoint_slices_deleted
+ endpoints_informer.endpoint_slices_cache = {}
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
+ endpoints_informer.on_deleted = on_endpoint_deleted
end
- endpoints_informer.on_deleted = on_endpoint_deleted
+
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
@@ -605,11 +696,14 @@ local function multiple_mode_init(confs)
if conf.watch_endpoint_slices then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
+ endpoints_informer.on_deleted = on_endpoint_slices_deleted
+ endpoints_informer.endpoint_slices_cache = {}
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
+ endpoints_informer.on_deleted = on_endpoint_deleted
end
- endpoints_informer.on_deleted = on_endpoint_deleted
+
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
diff --git a/t/kubernetes/configs/endpointslices.yaml
b/t/kubernetes/configs/endpointslices.yaml
index d22851233..6b13198f6 100644
--- a/t/kubernetes/configs/endpointslices.yaml
+++ b/t/kubernetes/configs/endpointslices.yaml
@@ -24,8 +24,10 @@ metadata:
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
- name: epslice
+ name: service-a-epslice1
namespace: ns-a
+ labels:
+ "kubernetes.io/service-name": service-a
addressType: IPv4
endpoints: [ ]
---
@@ -39,8 +41,10 @@ metadata:
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
- name: epslice
+ name: service-a-epslice1
namespace: ns-b
+ labels:
+ "kubernetes.io/service-name": service-a
addressType: IPv4
endpoints: [ ]
---
@@ -54,8 +58,10 @@ metadata:
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
- name: epslice
+ name: service-a-epslice1
namespace: ns-c
+ labels:
+ "kubernetes.io/service-name": service-a
addressType: IPv4
endpoints: [ ]
---
diff --git a/t/kubernetes/discovery/kubernetes3.t
b/t/kubernetes/discovery/kubernetes3.t
index 99b3dff37..47b902e18 100644
--- a/t/kubernetes/discovery/kubernetes3.t
+++ b/t/kubernetes/discovery/kubernetes3.t
@@ -279,7 +279,12 @@ POST /operators
{
"op": "replace_endpointslices",
"namespace": "ns-a",
- "name": "epslice",
+ "name": "service-a-epslice1",
+ "metadata": {
+ "labels": {
+ "kubernetes.io/service-name": "service-a"
+ }
+ },
"endpoints": [
{
"addresses": [
@@ -320,7 +325,12 @@ POST /operators
{
"op": "replace_endpointslices",
"namespace": "ns-b",
- "name": "epslice",
+ "name": "service-a-epslice1",
+ "metadata": {
+ "labels": {
+ "kubernetes.io/service-name": "service-a"
+ }
+ },
"endpoints": [
{
"addresses": [
@@ -361,7 +371,12 @@ POST /operators
{
"op": "replace_endpointslices",
"namespace": "ns-c",
- "name": "epslice",
+ "name": "service-a-epslice1",
+ "metadata": {
+ "labels": {
+ "kubernetes.io/service-name": "service-a"
+ }
+ },
"endpoints": [
{
"addresses": [
@@ -408,8 +423,8 @@ Content-type: application/json
--- request
GET /queries
[
-
"first/ns-a/epslice:p1","first/ns-a/epslice:p1","first/ns-b/epslice:p2","first/ns-b/epslice:p2","first/ns-c/epslice:p3","first/ns-c/epslice:p3",
-
"second/ns-a/epslice:p1","second/ns-a/epslice:p1","second/ns-b/epslice:p2","second/ns-b/epslice:p2","second/ns-c/epslice:p3","second/ns-c/epslice:p3"
+
"first/ns-a/service-a:p1","first/ns-a/service-a:p1","first/ns-b/service-a:p2","first/ns-b/service-a:p2","first/ns-c/service-a:p3","first/ns-c/service-a:p3",
+
"second/ns-a/service-a:p1","second/ns-a/service-a:p1","second/ns-b/service-a:p2","second/ns-b/service-a:p2","second/ns-c/service-a:p3","second/ns-c/service-a:p3"
]
--- more_headers
Content-type: application/json
@@ -447,8 +462,8 @@ discovery:
--- request
GET /queries
[
-
"first/ns-a/epslice:p1","first/ns-a/epslice:p1","first/ns-b/epslice:p2","first/ns-b/epslice:p2","first/ns-c/epslice:p3","first/ns-c/epslice:p3",
-
"second/ns-a/epslice:p1","second/ns-a/epslice:p1","second/ns-b/epslice:p2","second/ns-b/epslice:p2","second/ns-c/epslice:p3","second/ns-c/epslice:p3"
+
"first/ns-a/service-a:p1","first/ns-a/service-a:p1","first/ns-b/service-a:p2","first/ns-b/service-a:p2","first/ns-c/service-a:p3","first/ns-c/service-a:p3",
+
"second/ns-a/service-a:p1","second/ns-a/service-a:p1","second/ns-b/service-a:p2","second/ns-b/service-a:p2","second/ns-c/service-a:p3","second/ns-c/service-a:p3"
]
--- more_headers
Content-type: application/json
@@ -487,8 +502,8 @@ discovery:
--- request
GET /queries
[
-
"first/ns-a/epslice:p1","first/ns-a/epslice:p1","first/ns-b/epslice:p2","first/ns-b/epslice:p2","first/ns-c/epslice:p3","first/ns-c/epslice:p3",
-
"second/ns-a/epslice:p1","second/ns-a/epslice:p1","second/ns-b/epslice:p2","second/ns-b/epslice:p2","second/ns-c/epslice:p3","second/ns-c/epslice:p3"
+
"first/ns-a/service-a:p1","first/ns-a/service-a:p1","first/ns-b/service-a:p2","first/ns-b/service-a:p2","first/ns-c/service-a:p3","first/ns-c/service-a:p3",
+
"second/ns-a/service-a:p1","second/ns-a/service-a:p1","second/ns-b/service-a:p2","second/ns-b/service-a:p2","second/ns-c/service-a:p3","second/ns-c/service-a:p3"
]
--- more_headers
Content-type: application/json
diff --git a/t/kubernetes/discovery/kubernetes4.t
b/t/kubernetes/discovery/kubernetes4.t
new file mode 100644
index 000000000..3214523ac
--- /dev/null
+++ b/t/kubernetes/discovery/kubernetes4.t
@@ -0,0 +1,546 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BEGIN {
+ our $token_file =
"/tmp/var/run/secrets/kubernetes.io/serviceaccount/token";
+ our $token_value = eval {`cat $token_file 2>/dev/null`};
+
+ our $yaml_config = <<_EOC_;
+apisix:
+ node_listen: 1984
+deployment:
+ role: data_plane
+ role_data_plane:
+ config_provider: yaml
+discovery:
+ kubernetes:
+ - id: first
+ service:
+ host: "127.0.0.1"
+ port: "6443"
+ client:
+ token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+ watch_endpoint_slices: true
+ - id: second
+ service:
+ schema: "http"
+ host: "127.0.0.1"
+ port: "6445"
+ client:
+ token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+ watch_endpoint_slices: true
+
+_EOC_
+
+ our $single_yaml_config = <<_EOC_;
+apisix:
+ node_listen: 1984
+deployment:
+ role: data_plane
+ role_data_plane:
+ config_provider: yaml
+discovery:
+ kubernetes:
+ service:
+ host: "127.0.0.1"
+ port: "6443"
+ client:
+ token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+ watch_endpoint_slices: true
+_EOC_
+
+ our $create_ns_a_epslice2 = <<_EOC_;
+[
+ {
+ "op": "create_endpointslices",
+ "namespace": "ns-a",
+ "apiVersion": "discovery.k8s.io/v1",
+ "kind": "EndpointSlice",
+ "metadata": {
+ "name": "service-a-epslice2",
+ "labels": {
+ "kubernetes.io/service-name": "service-a"
+ }
+ },
+ "addressType": "IPv4",
+ "endpoints": [
+ {
+ "addresses": [
+ "10.0.0.4"
+ ],
+ "conditions": {
+ "ready": true,
+ "serving": true,
+ "terminating": false
+ },
+ "nodeName": "service-a-node4"
+ },
+ {
+ "addresses": [
+ "10.0.0.5"
+ ],
+ "conditions": {
+ "ready": true,
+ "serving": true,
+ "terminating": false
+ },
+ "nodeName": "service-a-node5"
+ }
+ ],
+ "ports": [
+ {
+ "name": "p1",
+ "port": 5001
+ }
+ ]
+ }
+]
+_EOC_
+
+ our $scale_in_ns_a_epslice1 = <<_EOC_;
+[
+ {
+ "op": "replace_endpointslices",
+ "name": "service-a-epslice1",
+ "namespace": "ns-a",
+ "apiVersion": "discovery.k8s.io/v1",
+ "kind": "EndpointSlice",
+ "metadata": {
+ "name": "service-a-epslice1",
+ "labels": {
+ "kubernetes.io/service-name": "service-a"
+ }
+ },
+ "addressType": "IPv4",
+ "endpoints": [
+ {
+ "addresses": [
+ "10.0.0.1"
+ ],
+ "conditions": {
+ "ready": true,
+ "serving": true,
+ "terminating": false
+ },
+ "nodeName": "service-a-node1"
+ },
+ {
+ "addresses": [
+ "10.0.0.2"
+ ],
+ "conditions": {
+ "ready": true,
+ "serving": true,
+ "terminating": false
+ },
+ "nodeName": "service-a-node2"
+ }
+ ],
+ "ports": [
+ {
+ "name": "p1",
+ "port": 5001
+ }
+ ]
+ }
+]
+_EOC_
+
+ our $scale_up_ns_a_epslice1 = <<_EOC_;
+[
+ {
+ "op": "replace_endpointslices",
+ "name": "service-a-epslice1",
+ "namespace": "ns-a",
+ "apiVersion": "discovery.k8s.io/v1",
+ "kind": "EndpointSlice",
+ "metadata": {
+ "name": "service-a-epslice1",
+ "labels": {
+ "kubernetes.io/service-name": "service-a"
+ }
+ },
+ "addressType": "IPv4",
+ "endpoints": [
+ {
+ "addresses": [
+ "10.0.0.1"
+ ],
+ "conditions": {
+ "ready": true,
+ "serving": true,
+ "terminating": false
+ },
+ "nodeName": "service-a-node1"
+ },
+ {
+ "addresses": [
+ "10.0.0.2"
+ ],
+ "conditions": {
+ "ready": true,
+ "serving": true,
+ "terminating": false
+ },
+ "nodeName": "service-a-node2"
+ },
+ {
+ "addresses": [
+ "10.0.0.3"
+ ],
+ "conditions": {
+ "ready": true,
+ "serving": true,
+ "terminating": false
+ },
+ "nodeName": "service-a-node3"
+ }
+ ],
+ "ports": [
+ {
+ "name": "p1",
+ "port": 5001
+ }
+ ]
+ }
+]
+_EOC_
+
+}
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+ $block->set_value("apisix_yaml", $apisix_yaml);
+
+ my $main_config = $block->main_config // <<_EOC_;
+env KUBERNETES_SERVICE_HOST=127.0.0.1;
+env KUBERNETES_SERVICE_PORT=6443;
+env KUBERNETES_CLIENT_TOKEN=$::token_value;
+env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
+_EOC_
+
+ $block->set_value("main_config", $main_config);
+
+ my $config = $block->config // <<_EOC_;
+ location /queries {
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local d = require("apisix.discovery.kubernetes")
+
+ ngx.sleep(1)
+
+ ngx.req.read_body()
+ local request_body = ngx.req.get_body_data()
+ local queries = core.json.decode(request_body)
+ local response_body = "{"
+ for _,query in ipairs(queries) do
+ local nodes = d.nodes(query)
+ if nodes==nil or #nodes==0 then
+ response_body=response_body.." "..0
+ else
+ response_body=response_body.." "..#nodes
+ end
+ end
+ ngx.say(response_body.." }")
+ }
+ }
+
+ location /operators {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local core = require("apisix.core")
+ local ipairs = ipairs
+
+ ngx.req.read_body()
+ local request_body = ngx.req.get_body_data()
+ local operators = core.json.decode(request_body)
+
+ core.log.info("get body ", request_body)
+ core.log.info("get operators ", #operators)
+ for _, op in ipairs(operators) do
+ local method, path, body
+ local headers = {
+ ["Host"] = "127.0.0.1:6445"
+ }
+
+ if op.op == "replace_endpointslices" then
+ method = "PATCH"
+ path = "/apis/discovery.k8s.io/v1/namespaces/" ..
op.namespace .. "/endpointslices/" .. op.name
+ if #op.endpoints == 0 then
+ body =
'[{"path":"/endpoints","op":"replace","value":[]}]'
+ else
+ local t = { { op = "replace", path = "/endpoints",
value = op.endpoints }, { op = "replace", path = "/ports", value = op.ports } }
+ body = core.json.encode(t, true)
+ end
+ headers["Content-Type"] = "application/json-patch+json"
+
+ elseif op.op == "create_endpointslices" then
+ method = "POST"
+ path = "/apis/discovery.k8s.io/v1/namespaces/" ..
op.namespace .. "/endpointslices"
+ op.op = nil
+ op.namespace = nil
+ body = core.json.encode(op, true)
+
+ elseif op.op == "delete_endpointslices" then
+ method = "DELETE"
+ path = "/apis/discovery.k8s.io/v1/namespaces/" ..
op.namespace .. "/endpointslices/" .. op.name
+ end
+
+
+ local httpc = http.new()
+ core.log.info("begin to connect ", "127.0.0.1:6445")
+ local ok, message = httpc:connect({
+ scheme = "http",
+ host = "127.0.0.1",
+ port = 6445,
+ })
+ if not ok then
+ core.log.error("connect 127.0.0.1:6445 failed, message
: ", message)
+ ngx.say("FAILED")
+ end
+ local res, err = httpc:request({
+ method = method,
+ path = path,
+ headers = headers,
+ body = body,
+ })
+ if err ~= nil then
+ core.log.err("operator k8s cluster error: ", err)
+ return 500
+ end
+
+ ngx.sleep(1)
+
+ if res.status ~= 200 and res.status ~= 201 and res.status
~= 409 then
+ return res.status
+ end
+ end
+ ngx.say("DONE")
+ }
+ }
+
+_EOC_
+
+ $block->set_value("config", $config);
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: endpointSlice1 update
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+"POST /operators
+$::scale_up_ns_a_epslice1",
+]
+--- more_headers
+Content-type: application/json
+--- response_body
+DONE
+
+
+
+=== TEST 2: test multi-k8s watching endpointSlices
+--- yaml_config eval: $::yaml_config
+--- request
+GET /queries
+[
+ "first/ns-a/service-a:p1"
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 3 }
+
+
+
+=== TEST 3: test single-k8s watching endpointSlices
+--- yaml_config eval: $::single_yaml_config
+--- request
+GET /queries
+[
+ "ns-a/service-a:p1"
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 3 }
+
+
+
+=== TEST 4: endpointSlice2 create and delete for multi-k8s mode
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+
+"POST /operators
+$::create_ns_a_epslice2",
+
+"GET /queries
+[
+ \"first/ns-a/service-a:p1\"
+]",
+
+"POST /operators
+[
+ {
+ \"op\": \"delete_endpointslices\",
+ \"namespace\": \"ns-a\",
+ \"name\": \"service-a-epslice2\"
+ }
+]",
+
+"GET /queries
+[
+ \"first/ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+ "DONE\n",
+ "{ 5 }\n",
+ "DONE\n",
+ "{ 3 }\n",
+]
+
+
+
+=== TEST 5: endpointSlice2 create and delete for single-k8s mode
+--- yaml_config eval: $::single_yaml_config
+--- request eval
+[
+
+"POST /operators
+$::create_ns_a_epslice2",
+
+"GET /queries
+[
+ \"ns-a/service-a:p1\"
+]",
+
+"POST /operators
+[
+ {
+ \"op\": \"delete_endpointslices\",
+ \"namespace\": \"ns-a\",
+ \"name\": \"service-a-epslice2\"
+ }
+]",
+
+"GET /queries
+[
+ \"ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+ "DONE\n",
+ "{ 5 }\n",
+ "DONE\n",
+ "{ 3 }\n",
+]
+
+
+
+=== TEST 6: endpointSlice scale for multi-k8s mode
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+
+"POST /operators
+$::scale_in_ns_a_epslice1",
+
+"GET /queries
+[
+ \"first/ns-a/service-a:p1\"
+]",
+
+"POST /operators
+$::scale_up_ns_a_epslice1",
+
+"GET /queries
+[
+ \"first/ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+ "DONE\n",
+ "{ 2 }\n",
+ "DONE\n",
+ "{ 3 }\n",
+]
+
+
+
+=== TEST 7: endpointSlice scale for single-k8s mode
+--- yaml_config eval: $::single_yaml_config
+--- request eval
+[
+
+"POST /operators
+$::scale_in_ns_a_epslice1",
+
+"GET /queries
+[
+ \"ns-a/service-a:p1\"
+]",
+
+"POST /operators
+$::scale_up_ns_a_epslice1",
+
+"GET /queries
+[
+ \"ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+ "DONE\n",
+ "{ 2 }\n",
+ "DONE\n",
+ "{ 3 }\n",
+]