This is an automated email from the ASF dual-hosted git repository.

baoyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 46176f249 fix: correct handling of endpointSlices in Kubernetes 
service discovery (#12634)
46176f249 is described below

commit 46176f249e1fbf599040b8861e0b701cfc8e3ef2
Author: aie <[email protected]>
AuthorDate: Wed Jan 28 14:12:01 2026 +0800

    fix: correct handling of endpointSlices in Kubernetes service discovery 
(#12634)
---
 apisix/discovery/kubernetes/init.lua     | 232 +++++++++----
 t/kubernetes/configs/endpointslices.yaml |  12 +-
 t/kubernetes/discovery/kubernetes3.t     |  33 +-
 t/kubernetes/discovery/kubernetes4.t     | 546 +++++++++++++++++++++++++++++++
 4 files changed, 742 insertions(+), 81 deletions(-)

diff --git a/apisix/discovery/kubernetes/init.lua 
b/apisix/discovery/kubernetes/init.lua
index 1b85181b6..594aaf2a0 100644
--- a/apisix/discovery/kubernetes/init.lua
+++ b/apisix/discovery/kubernetes/init.lua
@@ -16,6 +16,7 @@
 --
 
 local ngx = ngx
+local unpack = unpack
 local ipairs = ipairs
 local pairs = pairs
 local string = string
@@ -25,7 +26,6 @@ local os = os
 local error = error
 local pcall = pcall
 local setmetatable = setmetatable
-local type = type
 local is_http = ngx.config.subsystem == "http"
 local process = require("ngx.process")
 local core = require("apisix.core")
@@ -42,6 +42,7 @@ local endpoint_lrucache = core.lrucache.new({
 })
 
 local endpoint_buffer = {}
+local kubernetes_service_name_label = "kubernetes.io/service-name"
 
 local function sort_nodes_cmp(left, right)
     if left.host ~= right.host then
@@ -51,69 +52,133 @@ local function sort_nodes_cmp(left, right)
     return left.port < right.port
 end
 
-local function on_endpoint_slices_modified(handle, endpoint, operate)
-    if handle.namespace_selector and
-            not handle:namespace_selector(endpoint.metadata.namespace) then
-        return
+local function update_endpoint_slices_cache(handle, endpoint_key, slice, 
slice_name)
+    if not handle.endpoint_slices_cache[endpoint_key] then
+        handle.endpoint_slices_cache[endpoint_key] = {}
     end
+    local endpoint_slices = handle.endpoint_slices_cache[endpoint_key]
+    endpoint_slices[slice_name] = slice
+end
 
-    core.log.debug(core.json.delay_encode(endpoint))
-    core.table.clear(endpoint_buffer)
-
-    local endpointslices = endpoint.endpoints
-    if type(endpointslices) == "table" then
-        for _, endpointslice in ipairs(endpointslices) do
-            if endpointslice.addresses then
-                local addresses = endpointslice.addresses
-                for _, port in ipairs(endpoint.ports or {}) do
-                    local port_name
-                    if port.name then
-                        port_name = port.name
-                    elseif port.targetPort then
-                        port_name = tostring(port.targetPort)
-                    else
-                        port_name = tostring(port.port)
-                    end
-
-                    if endpointslice.conditions and 
endpointslice.conditions.ready then
-                        local nodes = endpoint_buffer[port_name]
-                        if nodes == nil then
-                            nodes = core.table.new(0, #endpointslices * 
#addresses)
-                            endpoint_buffer[port_name] = nodes
-                        end
-
-                        for _, address in ipairs(addresses) do
-                            core.table.insert(nodes, {
-                                host = address.ip,
-                                port = port.port,
-                                weight = handle.default_weight
-                            })
-                        end
-                    end
-                end
+local function get_endpoints_from_cache(handle, endpoint_key)
+    local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or {}
+    local endpoints = {}
+    for _, endpoint_slice in pairs(endpoint_slices) do
+        for port, targets in pairs(endpoint_slice) do
+            if not endpoints[port] then
+                endpoints[port] = core.table.new(0, #targets)
             end
+            core.table.insert_tail(endpoints[port], unpack(targets))
         end
     end
 
-    for _, ports in pairs(endpoint_buffer) do
-        for _, nodes in pairs(ports) do
-            core.table.sort(nodes, sort_nodes_cmp)
-        end
-    end
-    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
-    local endpoint_content = core.json.encode(endpoint_buffer, true)
-    local endpoint_version = ngx.crc32_long(endpoint_content)
+    return endpoints
+end
 
+local function update_endpoint_dict(handle, endpoints, endpoint_key)
+    local endpoint_content = core.json.encode(endpoints, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
     local _, err
     _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
     if err then
-        core.log.error("set endpoint version into discovery DICT failed, ", 
err)
-        return
+        return false, "set endpoint version into discovery DICT failed, " .. 
err
     end
     _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
     if err then
-        core.log.error("set endpoint into discovery DICT failed, ", err)
         handle.endpoint_dict:delete(endpoint_key .. "#version")
+        return false, "set endpoint into discovery DICT failed, " .. err
+    end
+
+    return true
+end
+
+local function validate_endpoint_slice(endpoint_slice)
+    if not endpoint_slice.metadata then
+        return false, "endpoint_slice has no metadata, endpointSlice: "
+                .. core.json.encode(endpoint_slice)
+    end
+    if not endpoint_slice.metadata.name then
+        return false, "endpoint_slice has no metadata.name, endpointSlice: "
+                .. core.json.encode(endpoint_slice)
+    end
+    if not endpoint_slice.metadata.namespace then
+        return false, "endpoint_slice has no metadata.namespace, 
endpointSlice: "
+                .. core.json.encode(endpoint_slice)
+    end
+    if not endpoint_slice.metadata.labels
+            or not 
endpoint_slice.metadata.labels[kubernetes_service_name_label] then
+        return false, "endpoint_slice has no service-name, endpointSlice: "
+                .. core.json.encode(endpoint_slice)
+    end
+
+    return true
+end
+
+local function on_endpoint_slices_modified(handle, endpoint_slice, operate)
+    local ok, err = validate_endpoint_slice(endpoint_slice)
+    if not ok then
+        core.log.error("endpoint_slice validation fail: ", err)
+        return
+    end
+    if handle.namespace_selector and
+            not handle:namespace_selector(endpoint_slice.metadata.namespace) 
then
+        return
+    end
+
+    core.log.debug("get endpoint_slice: ", 
core.json.delay_encode(endpoint_slice))
+    --record nodes to every port in service
+    local port_to_nodes = {}
+
+    local slice_endpoints = endpoint_slice.endpoints
+    if not slice_endpoints or slice_endpoints == ngx.null then
+        slice_endpoints = {}
+    end
+
+    for _, endpoint in ipairs(slice_endpoints) do
+        if endpoint.addresses
+                and endpoint.conditions
+                and endpoint.conditions.ready then
+            local addresses = endpoint.addresses
+            for _, port in ipairs(endpoint_slice.ports or {}) do
+                local port_name
+                if port.name then
+                    port_name = port.name
+                elseif port.targetPort then
+                    port_name = tostring(port.targetPort)
+                else
+                    port_name = tostring(port.port)
+                end
+
+                local nodes = port_to_nodes[port_name]
+                if nodes == nil then
+                    nodes = core.table.new(0, #slice_endpoints * #addresses)
+                    port_to_nodes[port_name] = nodes
+                end
+
+                for _, ip in ipairs(addresses) do
+                    core.table.insert(nodes, {
+                        host = ip,
+                        port = port.port,
+                        weight = handle.default_weight
+                    })
+                end
+            end
+        end
+    end
+
+    local endpoint_key = endpoint_slice.metadata.namespace
+            .. "/" .. 
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+    update_endpoint_slices_cache(handle, endpoint_key, port_to_nodes, 
endpoint_slice.metadata.name)
+
+    local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+    for _, nodes in pairs(cached_endpoints) do
+        core.table.sort(nodes, sort_nodes_cmp)
+    end
+
+    local ok, err = update_endpoint_dict(handle, cached_endpoints, 
endpoint_key)
+    if not ok then
+        core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
+                ", err: ", err)
         return
     end
     if operate == "list" then
@@ -122,6 +187,36 @@ local function on_endpoint_slices_modified(handle, 
endpoint, operate)
     end
 end
 
+local function on_endpoint_slices_deleted(handle, endpoint_slice)
+    local ok, err = validate_endpoint_slice(endpoint_slice)
+    if not ok then
+        core.log.error("endpoint_slice validation fail: ", err)
+        return
+    end
+
+    if handle.namespace_selector and
+            not handle:namespace_selector(endpoint_slice.metadata.namespace) 
then
+        return
+    end
+
+    core.log.debug("delete endpoint_slice: ", 
core.json.delay_encode(endpoint_slice))
+
+    local endpoint_key = endpoint_slice.metadata.namespace
+            .. "/" .. 
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+    update_endpoint_slices_cache(handle, endpoint_key, nil, 
endpoint_slice.metadata.name)
+
+    local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+    for _, nodes in pairs(cached_endpoints) do
+        core.table.sort(nodes, sort_nodes_cmp)
+    end
+
+    ok, err = update_endpoint_dict(handle, cached_endpoints, endpoint_key)
+    if not ok then
+        core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
+                ", err: ", err)
+    end
+end
+
 local function on_endpoint_modified(handle, endpoint, operate)
     if handle.namespace_selector and
             not handle:namespace_selector(endpoint.metadata.namespace) then
@@ -162,26 +257,16 @@ local function on_endpoint_modified(handle, endpoint, 
operate)
         end
     end
 
-    for _, ports in pairs(endpoint_buffer) do
-        for _, nodes in pairs(ports) do
-            core.table.sort(nodes, sort_nodes_cmp)
-        end
+
+    for _, nodes in pairs(endpoint_buffer) do
+        core.table.sort(nodes, sort_nodes_cmp)
     end
 
     local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
-    local endpoint_content = core.json.encode(endpoint_buffer, true)
-    local endpoint_version = ngx.crc32_long(endpoint_content)
-
-    local _, err
-    _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
-    if err then
-        core.log.error("set endpoint version into discovery DICT failed, ", 
err)
-        return
-    end
-    _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
-    if err then
-        core.log.error("set endpoint into discovery DICT failed, ", err)
-        handle.endpoint_dict:delete(endpoint_key .. "#version")
+    local ok, err = update_endpoint_dict(handle, endpoint_buffer, endpoint_key)
+    if not ok then
+        core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
+                ", err: ", err)
         return
     end
     if operate == "list" then
@@ -207,6 +292,9 @@ end
 local function pre_list(handle)
     handle.current_keys_hash = {}
     handle.existing_keys = handle.endpoint_dict:get_keys(0)
+    if handle.endpoint_slices_cache then
+        handle.endpoint_slices_cache = {}
+    end
 end
 
 
@@ -499,11 +587,14 @@ local function single_mode_init(conf)
     if conf.watch_endpoint_slices then
         endpoints_informer.on_added = on_endpoint_slices_modified
         endpoints_informer.on_modified = on_endpoint_slices_modified
+        endpoints_informer.on_deleted = on_endpoint_slices_deleted
+        endpoints_informer.endpoint_slices_cache = {}
     else
         endpoints_informer.on_added = on_endpoint_modified
         endpoints_informer.on_modified = on_endpoint_modified
+        endpoints_informer.on_deleted = on_endpoint_deleted
     end
-    endpoints_informer.on_deleted = on_endpoint_deleted
+
     endpoints_informer.pre_list = pre_list
     endpoints_informer.post_list = post_list
 
@@ -605,11 +696,14 @@ local function multiple_mode_init(confs)
         if conf.watch_endpoint_slices then
             endpoints_informer.on_added = on_endpoint_slices_modified
             endpoints_informer.on_modified = on_endpoint_slices_modified
+            endpoints_informer.on_deleted = on_endpoint_slices_deleted
+            endpoints_informer.endpoint_slices_cache = {}
         else
             endpoints_informer.on_added = on_endpoint_modified
             endpoints_informer.on_modified = on_endpoint_modified
+            endpoints_informer.on_deleted = on_endpoint_deleted
         end
-        endpoints_informer.on_deleted = on_endpoint_deleted
+
         endpoints_informer.pre_list = pre_list
         endpoints_informer.post_list = post_list
 
diff --git a/t/kubernetes/configs/endpointslices.yaml 
b/t/kubernetes/configs/endpointslices.yaml
index d22851233..6b13198f6 100644
--- a/t/kubernetes/configs/endpointslices.yaml
+++ b/t/kubernetes/configs/endpointslices.yaml
@@ -24,8 +24,10 @@ metadata:
 kind: EndpointSlice
 apiVersion: discovery.k8s.io/v1
 metadata:
-  name: epslice
+  name: service-a-epslice1
   namespace: ns-a
+  labels:
+    "kubernetes.io/service-name": service-a
 addressType: IPv4
 endpoints: [ ]
 ---
@@ -39,8 +41,10 @@ metadata:
 kind: EndpointSlice
 apiVersion: discovery.k8s.io/v1
 metadata:
-  name: epslice
+  name: service-a-epslice1
   namespace: ns-b
+  labels:
+    "kubernetes.io/service-name": service-a
 addressType: IPv4
 endpoints: [ ]
 ---
@@ -54,8 +58,10 @@ metadata:
 kind: EndpointSlice
 apiVersion: discovery.k8s.io/v1
 metadata:
-  name: epslice
+  name: service-a-epslice1
   namespace: ns-c
+  labels:
+    "kubernetes.io/service-name": service-a
 addressType: IPv4
 endpoints: [ ]
 ---
diff --git a/t/kubernetes/discovery/kubernetes3.t 
b/t/kubernetes/discovery/kubernetes3.t
index 99b3dff37..47b902e18 100644
--- a/t/kubernetes/discovery/kubernetes3.t
+++ b/t/kubernetes/discovery/kubernetes3.t
@@ -279,7 +279,12 @@ POST /operators
     {
         "op": "replace_endpointslices",
         "namespace": "ns-a",
-        "name": "epslice",
+        "name": "service-a-epslice1",
+        "metadata": {
+            "labels": {
+                "kubernetes.io/service-name": "service-a"
+            }
+        },
         "endpoints": [
             {
                 "addresses": [
@@ -320,7 +325,12 @@ POST /operators
     {
         "op": "replace_endpointslices",
         "namespace": "ns-b",
-        "name": "epslice",
+        "name": "service-a-epslice1",
+        "metadata": {
+            "labels": {
+                "kubernetes.io/service-name": "service-a"
+            }
+        },
         "endpoints": [
             {
                 "addresses": [
@@ -361,7 +371,12 @@ POST /operators
     {
         "op": "replace_endpointslices",
         "namespace": "ns-c",
-        "name": "epslice",
+        "name": "service-a-epslice1",
+        "metadata": {
+            "labels": {
+                "kubernetes.io/service-name": "service-a"
+            }
+        },
         "endpoints": [
             {
                 "addresses": [
@@ -408,8 +423,8 @@ Content-type: application/json
 --- request
 GET /queries
 [
-  
"first/ns-a/epslice:p1","first/ns-a/epslice:p1","first/ns-b/epslice:p2","first/ns-b/epslice:p2","first/ns-c/epslice:p3","first/ns-c/epslice:p3",
-  
"second/ns-a/epslice:p1","second/ns-a/epslice:p1","second/ns-b/epslice:p2","second/ns-b/epslice:p2","second/ns-c/epslice:p3","second/ns-c/epslice:p3"
+  
"first/ns-a/service-a:p1","first/ns-a/service-a:p1","first/ns-b/service-a:p2","first/ns-b/service-a:p2","first/ns-c/service-a:p3","first/ns-c/service-a:p3",
+  
"second/ns-a/service-a:p1","second/ns-a/service-a:p1","second/ns-b/service-a:p2","second/ns-b/service-a:p2","second/ns-c/service-a:p3","second/ns-c/service-a:p3"
 ]
 --- more_headers
 Content-type: application/json
@@ -447,8 +462,8 @@ discovery:
 --- request
 GET /queries
 [
-  
"first/ns-a/epslice:p1","first/ns-a/epslice:p1","first/ns-b/epslice:p2","first/ns-b/epslice:p2","first/ns-c/epslice:p3","first/ns-c/epslice:p3",
-  
"second/ns-a/epslice:p1","second/ns-a/epslice:p1","second/ns-b/epslice:p2","second/ns-b/epslice:p2","second/ns-c/epslice:p3","second/ns-c/epslice:p3"
+  
"first/ns-a/service-a:p1","first/ns-a/service-a:p1","first/ns-b/service-a:p2","first/ns-b/service-a:p2","first/ns-c/service-a:p3","first/ns-c/service-a:p3",
+  
"second/ns-a/service-a:p1","second/ns-a/service-a:p1","second/ns-b/service-a:p2","second/ns-b/service-a:p2","second/ns-c/service-a:p3","second/ns-c/service-a:p3"
 ]
 --- more_headers
 Content-type: application/json
@@ -487,8 +502,8 @@ discovery:
 --- request
 GET /queries
 [
-  
"first/ns-a/epslice:p1","first/ns-a/epslice:p1","first/ns-b/epslice:p2","first/ns-b/epslice:p2","first/ns-c/epslice:p3","first/ns-c/epslice:p3",
-  
"second/ns-a/epslice:p1","second/ns-a/epslice:p1","second/ns-b/epslice:p2","second/ns-b/epslice:p2","second/ns-c/epslice:p3","second/ns-c/epslice:p3"
+  
"first/ns-a/service-a:p1","first/ns-a/service-a:p1","first/ns-b/service-a:p2","first/ns-b/service-a:p2","first/ns-c/service-a:p3","first/ns-c/service-a:p3",
+  
"second/ns-a/service-a:p1","second/ns-a/service-a:p1","second/ns-b/service-a:p2","second/ns-b/service-a:p2","second/ns-c/service-a:p3","second/ns-c/service-a:p3"
 ]
 --- more_headers
 Content-type: application/json
diff --git a/t/kubernetes/discovery/kubernetes4.t 
b/t/kubernetes/discovery/kubernetes4.t
new file mode 100644
index 000000000..3214523ac
--- /dev/null
+++ b/t/kubernetes/discovery/kubernetes4.t
@@ -0,0 +1,546 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BEGIN {
+    our $token_file = 
"/tmp/var/run/secrets/kubernetes.io/serviceaccount/token";
+    our $token_value = eval {`cat $token_file 2>/dev/null`};
+
+    our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  kubernetes:
+    - id: first
+      service:
+        host: "127.0.0.1"
+        port: "6443"
+      client:
+        token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+      watch_endpoint_slices: true
+    - id: second
+      service:
+        schema: "http"
+        host: "127.0.0.1"
+        port: "6445"
+      client:
+        token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+      watch_endpoint_slices: true
+
+_EOC_
+
+    our $single_yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  kubernetes:
+    service:
+      host: "127.0.0.1"
+      port: "6443"
+    client:
+      token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
+    watch_endpoint_slices: true
+_EOC_
+
+    our $create_ns_a_epslice2 = <<_EOC_;
+[
+    {
+        "op": "create_endpointslices",
+        "namespace": "ns-a",
+        "apiVersion": "discovery.k8s.io/v1",
+        "kind": "EndpointSlice",
+        "metadata": {
+            "name": "service-a-epslice2",
+            "labels": {
+                "kubernetes.io/service-name": "service-a"
+            }
+        },
+        "addressType": "IPv4",
+        "endpoints": [
+            {
+                "addresses": [
+                    "10.0.0.4"
+                ],
+                "conditions": {
+                    "ready": true,
+                    "serving": true,
+                    "terminating": false
+                },
+                "nodeName": "service-a-node4"
+            },
+            {
+                "addresses": [
+                    "10.0.0.5"
+                ],
+                "conditions": {
+                    "ready": true,
+                    "serving": true,
+                    "terminating": false
+                },
+                "nodeName": "service-a-node5"
+            }
+        ],
+        "ports": [
+            {
+                "name": "p1",
+                "port": 5001
+            }
+        ]
+    }
+]
+_EOC_
+
+    our $scale_in_ns_a_epslice1 = <<_EOC_;
+[
+    {
+        "op": "replace_endpointslices",
+        "name": "service-a-epslice1",
+        "namespace": "ns-a",
+        "apiVersion": "discovery.k8s.io/v1",
+        "kind": "EndpointSlice",
+        "metadata": {
+            "name": "service-a-epslice1",
+            "labels": {
+                "kubernetes.io/service-name": "service-a"
+            }
+        },
+        "addressType": "IPv4",
+        "endpoints": [
+            {
+                "addresses": [
+                    "10.0.0.1"
+                ],
+                "conditions": {
+                    "ready": true,
+                    "serving": true,
+                    "terminating": false
+                },
+                "nodeName": "service-a-node1"
+            },
+            {
+                "addresses": [
+                    "10.0.0.2"
+                ],
+                "conditions": {
+                    "ready": true,
+                    "serving": true,
+                    "terminating": false
+                },
+                "nodeName": "service-a-node2"
+            }
+        ],
+        "ports": [
+            {
+                "name": "p1",
+                "port": 5001
+            }
+        ]
+    }
+]
+_EOC_
+
+    our $scale_up_ns_a_epslice1 = <<_EOC_;
+[
+    {
+        "op": "replace_endpointslices",
+        "name": "service-a-epslice1",
+        "namespace": "ns-a",
+        "apiVersion": "discovery.k8s.io/v1",
+        "kind": "EndpointSlice",
+        "metadata": {
+            "name": "service-a-epslice1",
+            "labels": {
+                "kubernetes.io/service-name": "service-a"
+            }
+        },
+        "addressType": "IPv4",
+        "endpoints": [
+            {
+                "addresses": [
+                    "10.0.0.1"
+                ],
+                "conditions": {
+                    "ready": true,
+                    "serving": true,
+                    "terminating": false
+                },
+                "nodeName": "service-a-node1"
+            },
+            {
+                "addresses": [
+                    "10.0.0.2"
+                ],
+                "conditions": {
+                    "ready": true,
+                    "serving": true,
+                    "terminating": false
+                },
+                "nodeName": "service-a-node2"
+            },
+            {
+                "addresses": [
+                    "10.0.0.3"
+                ],
+                "conditions": {
+                    "ready": true,
+                    "serving": true,
+                    "terminating": false
+                },
+                "nodeName": "service-a-node3"
+            }
+        ],
+        "ports": [
+            {
+                "name": "p1",
+                "port": 5001
+            }
+        ]
+    }
+]
+_EOC_
+
+}
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+    $block->set_value("apisix_yaml", $apisix_yaml);
+
+    my $main_config = $block->main_config // <<_EOC_;
+env KUBERNETES_SERVICE_HOST=127.0.0.1;
+env KUBERNETES_SERVICE_PORT=6443;
+env KUBERNETES_CLIENT_TOKEN=$::token_value;
+env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
+_EOC_
+
+    $block->set_value("main_config", $main_config);
+
+    my $config = $block->config // <<_EOC_;
+        location /queries {
+            content_by_lua_block {
+              local core = require("apisix.core")
+              local d = require("apisix.discovery.kubernetes")
+
+              ngx.sleep(1)
+
+              ngx.req.read_body()
+              local request_body = ngx.req.get_body_data()
+              local queries = core.json.decode(request_body)
+              local response_body = "{"
+              for _,query in ipairs(queries) do
+                local nodes = d.nodes(query)
+                if nodes==nil or #nodes==0 then
+                    response_body=response_body.." "..0
+                else
+                    response_body=response_body.." "..#nodes
+                end
+              end
+              ngx.say(response_body.." }")
+            }
+        }
+
+        location /operators {
+            content_by_lua_block {
+                local http = require("resty.http")
+                local core = require("apisix.core")
+                local ipairs = ipairs
+
+                ngx.req.read_body()
+                local request_body = ngx.req.get_body_data()
+                local operators = core.json.decode(request_body)
+
+                core.log.info("get body ", request_body)
+                core.log.info("get operators ", #operators)
+                for _, op in ipairs(operators) do
+                    local method, path, body
+                    local headers = {
+                        ["Host"] = "127.0.0.1:6445"
+                    }
+
+                    if op.op == "replace_endpointslices" then
+                        method = "PATCH"
+                        path = "/apis/discovery.k8s.io/v1/namespaces/" .. 
op.namespace .. "/endpointslices/" .. op.name
+                        if #op.endpoints == 0 then
+                            body = 
'[{"path":"/endpoints","op":"replace","value":[]}]'
+                        else
+                            local t = { { op = "replace", path = "/endpoints", 
value = op.endpoints }, { op = "replace", path = "/ports", value = op.ports } }
+                            body = core.json.encode(t, true)
+                        end
+                        headers["Content-Type"] = "application/json-patch+json"
+
+                    elseif op.op == "create_endpointslices" then
+                        method = "POST"
+                        path = "/apis/discovery.k8s.io/v1/namespaces/" .. 
op.namespace .. "/endpointslices"
+                        op.op = nil
+                        op.namespace = nil
+                        body = core.json.encode(op, true)
+
+                    elseif op.op == "delete_endpointslices" then
+                        method = "DELETE"
+                        path = "/apis/discovery.k8s.io/v1/namespaces/" .. 
op.namespace .. "/endpointslices/" .. op.name
+                    end
+
+
+                    local httpc = http.new()
+                    core.log.info("begin to connect ", "127.0.0.1:6445")
+                    local ok, message = httpc:connect({
+                        scheme = "http",
+                        host = "127.0.0.1",
+                        port = 6445,
+                    })
+                    if not ok then
+                        core.log.error("connect 127.0.0.1:6445 failed, message 
: ", message)
+                        ngx.say("FAILED")
+                    end
+                    local res, err = httpc:request({
+                        method = method,
+                        path = path,
+                        headers = headers,
+                        body = body,
+                    })
+                    if err ~= nil then
+                        core.log.err("operator k8s cluster error: ", err)
+                        return 500
+                    end
+
+                    ngx.sleep(1)
+
+                    if res.status ~= 200 and res.status ~= 201 and res.status 
~= 409 then
+                        return res.status
+                    end
+                end
+                ngx.say("DONE")
+            }
+        }
+
+_EOC_
+
+    $block->set_value("config", $config);
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: endpointSlice1 update
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+"POST /operators
+$::scale_up_ns_a_epslice1",
+]
+--- more_headers
+Content-type: application/json
+--- response_body
+DONE
+
+
+
+=== TEST 2: test multi-k8s watching endpointSlices
+--- yaml_config eval: $::yaml_config
+--- request
+GET /queries
+[
+  "first/ns-a/service-a:p1"
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 3 }
+
+
+
+=== TEST 3: test single-k8s watching endpointSlices
+--- yaml_config eval: $::single_yaml_config
+--- request
+GET /queries
+[
+  "ns-a/service-a:p1"
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 3 }
+
+
+
+=== TEST 4: endpointSlice2 create and delete for multi-k8s mode
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+
+"POST /operators
+$::create_ns_a_epslice2",
+
+"GET /queries
+[
+  \"first/ns-a/service-a:p1\"
+]",
+
+"POST /operators
+[
+    {
+        \"op\": \"delete_endpointslices\",
+        \"namespace\": \"ns-a\",
+        \"name\": \"service-a-epslice2\"
+    }
+]",
+
+"GET /queries
+[
+  \"first/ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+    "DONE\n",
+    "{ 5 }\n",
+    "DONE\n",
+    "{ 3 }\n",
+]
+
+
+
+=== TEST 5: endpointSlice2 create and delete for single-k8s mode
+--- yaml_config eval: $::single_yaml_config
+--- request eval
+[
+
+"POST /operators
+$::create_ns_a_epslice2",
+
+"GET /queries
+[
+  \"ns-a/service-a:p1\"
+]",
+
+"POST /operators
+[
+    {
+        \"op\": \"delete_endpointslices\",
+        \"namespace\": \"ns-a\",
+        \"name\": \"service-a-epslice2\"
+    }
+]",
+
+"GET /queries
+[
+  \"ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+    "DONE\n",
+    "{ 5 }\n",
+    "DONE\n",
+    "{ 3 }\n",
+]
+
+
+
+=== TEST 6: endpointSlice scale for multi-k8s mode
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+
+"POST /operators
+$::scale_in_ns_a_epslice1",
+
+"GET /queries
+[
+  \"first/ns-a/service-a:p1\"
+]",
+
+"POST /operators
+$::scale_up_ns_a_epslice1",
+
+"GET /queries
+[
+  \"first/ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+    "DONE\n",
+    "{ 2 }\n",
+    "DONE\n",
+    "{ 3 }\n",
+]
+
+
+
+=== TEST 7: endpointSlice scale for single-k8s mode
+--- yaml_config eval: $::single_yaml_config
+--- request eval
+[
+
+"POST /operators
+$::scale_in_ns_a_epslice1",
+
+"GET /queries
+[
+  \"ns-a/service-a:p1\"
+]",
+
+"POST /operators
+$::scale_up_ns_a_epslice1",
+
+"GET /queries
+[
+  \"ns-a/service-a:p1\"
+]",
+
+]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+[
+    "DONE\n",
+    "{ 2 }\n",
+    "DONE\n",
+    "{ 3 }\n",
+]

Reply via email to