tokers commented on a change in pull request #2935:
URL: https://github.com/apache/apisix/pull/2935#discussion_r547914974



##########
File path: apisix/plugins/traffic-split.lua
##########
@@ -0,0 +1,314 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core       = require("apisix.core")
+local upstream   = require("apisix.upstream")
+local schema_def = require("apisix.schema_def")
+local init       = require("apisix.init")
+local roundrobin = require("resty.roundrobin")
+local ipmatcher  = require("resty.ipmatcher")
+local expr       = require("resty.expr.v1")
+local pairs      = pairs
+local ipairs     = ipairs
+local table_insert = table.insert
+
+local lrucache = core.lrucache.new({
+    ttl = 0, count = 512
+})
+
+
+local vars_schema = {
+    type = "array",
+    items = {
+        type = "array",
+        items = {
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 100
+            },
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 2
+            }
+        },
+        additionalItems = {
+            anyOf = {
+                {type = "string"},
+                {type = "number"},
+                {type = "boolean"},
+                {
+                    type = "array",
+                    items = {
+                        anyOf = {
+                            {
+                            type = "string",
+                            minLength = 1, maxLength = 100
+                            },
+                            {
+                                type = "number"
+                            },
+                            {
+                                type = "boolean"
+                            }
+                        }
+                    },
+                    uniqueItems = true
+                }
+            }
+        },
+        minItems = 0,
+        maxItems = 10
+    }
+}
+
+
+local match_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            vars = vars_schema
+        }
+    },
+    -- When there is no `match` rule, the default rule passes.
+    -- Perform upstream logic of plugin configuration.
+    default = {{ vars = {{"server_port", ">", 0}}}}
+}
+
+
+local upstreams_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            upstream_id = schema_def.id_schema,    -- todo: support 
upstream_id method
+            upstream = schema_def.upstream,
+            weighted_upstream = {
+                description = "used to split traffic between different" ..
+                              "upstreams for plugin configuration",
+                type = "integer",
+                default = 1,
+                minimum = 0
+            }
+        }
+    },
+    -- When the upstream configuration of the plugin is missing,
+    -- the upstream of `route` is used by default.
+    default = {
+        {
+            weighted_upstream = 1
+        }
+    },
+    minItems = 1,
+    maxItems = 20
+}
+
+
+local schema = {
+    type = "object",
+    properties = {
+        rules = {
+            type = "array",
+            items = {
+                type = "object",
+                properties = {
+                    match = match_schema,
+                    upstreams = upstreams_schema
+                }
+            }
+        }
+    }
+}
+
+local plugin_name = "traffic-split"
+
+local _M = {
+    version = 0.1,
+    priority = 966,
+    name = plugin_name,
+    schema = schema
+}
+
+function _M.check_schema(conf)
+    local ok, err = core.schema.check(schema, conf)
+
+    if not ok then
+        return false, err
+    end
+
+    return true
+end
+
+
+local function parse_domain_for_node(node)
+    if not ipmatcher.parse_ipv4(node)
+       and not ipmatcher.parse_ipv6(node)
+    then
+        local ip, err = init.parse_domain(node)
+        if ip then
+            return ip
+        end
+
+        if err then
+            return nil, err
+        end
+    end
+
+    return node
+end
+
+
+local function set_pass_host(ctx, upstream_info, host)
+    -- Currently only supports a single upstream of the domain name.
+    -- When the upstream is `IP`, do not do any `pass_host` operation.
+    if not core.utils.parse_ipv4(host)
+       and not core.utils.parse_ipv6(host)
+    then
+        local pass_host = upstream_info.pass_host or "pass"
+        if pass_host == "pass" then
+            ctx.var.upstream_host = ctx.var.host
+            return
+        end
+
+        if pass_host == "rewrite" then
+            ctx.var.upstream_host = upstream_info.upstream_host
+            return
+        end
+
+        ctx.var.upstream_host = host
+        return
+    end
+
+    return
+end
+
+
+local function set_upstream(upstream_info, ctx)
+    local nodes = upstream_info.nodes
+    local new_nodes = {}
+    if core.table.isarray(nodes) then
+        for _, node in ipairs(nodes) do
+            set_pass_host(ctx, upstream_info, node.host)
+            node.host = parse_domain_for_node(node.host)
+            node.port = node.port
+            node.weight = node.weight
+            table_insert(new_nodes, node)
+        end
+    else
+        for addr, weight in pairs(nodes) do
+            local node = {}
+            local ip, port, host
+            host, port = core.utils.parse_addr(addr)
+            set_pass_host(ctx, upstream_info, host)
+            ip = parse_domain_for_node(host)
+            node.host = ip
+            node.port = port
+            node.weight = weight
+            table_insert(new_nodes, node)
+        end
+    end
+    core.log.info("upstream_host: ", ctx.var.upstream_host)
+
+    local up_conf = {
+        name = upstream_info.name,
+        type = upstream_info.type,
+        nodes = new_nodes,
+        timeout = {
+            send = upstream_info.timeout and upstream_info.timeout.send or 15,
+            read = upstream_info.timeout and upstream_info.timeout.read or 15,
+            connect = upstream_info.timeout and upstream_info.timeout.connect 
or 15
+        }
+    }
+
+    local ok, err = upstream.check_schema(up_conf)
+    if not ok then
+        return 500, err
+    end
+
+    local matched_route = ctx.matched_route
+    upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
+                ctx.conf_version, up_conf, matched_route)
+    return
+end
+
+
+local function new_rr_obj(upstreams)
+    local server_list = {}
+    for _, upstream_obj in ipairs(upstreams) do
+        if not upstream_obj.upstream then
+            -- If the `upstream` object has only the `weighted_upstream` 
value, it means
+            -- that the `upstream` weight value on the default `route` has 
been reached.
+            -- Need to set an identifier to mark the empty upstream.
+            upstream_obj.upstream = "empty_upstream"
+        end
+        server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream
+    end
+
+    return roundrobin:new(server_list)
+end
+
+
+function _M.access(conf, ctx)
+    if not conf or not conf.rules then
+        return
+    end
+
+    local upstreams, match_flag
+    for _, rule in pairs(conf.rules) do

Review comment:
       For array-like table, use ipairs.

##########
File path: README.md
##########
@@ -99,6 +99,7 @@ A/B testing, canary release, blue-green deployment, limit 
rate, defense against
   - [Health Checks](doc/health-check.md): Enable health check on the upstream 
node, and will automatically filter unhealthy nodes during load balancing to 
ensure system stability.
   - Circuit-Breaker: Intelligent tracking of unhealthy upstream services.
   - [Proxy Mirror](doc/plugins/proxy-mirror.md): Provides the ability to 
mirror client requests.
+  - [Traffic Split](doc/plugins/traffic-split.md): Support the functions of 
gray release, blue-green release and custom release.

Review comment:
       Not so intuitive.
   
   Use: allows users to incrementally direct percentages of traffic between 
various upstreams.

##########
File path: README_CN.md
##########
@@ -97,6 +97,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵
   - [健康检查](doc/zh-cn/health-check.md):启用上游节点的健康检查,将在负载均衡期间自动过滤不健康的节点,以确保系统稳定性。
   - 熔断器: 智能跟踪不健康上游服务。
   - [代理镜像](doc/zh-cn/plugins/proxy-mirror.md): 提供镜像客户端请求的能力。
+  - [流量拆分](doc/zh-cn/plugins/traffic-split.md): 支持灰度发布、蓝绿发布和自定义发布的功能。

Review comment:
       Ditto 

##########
File path: apisix/plugins/traffic-split.lua
##########
@@ -0,0 +1,314 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core       = require("apisix.core")
+local upstream   = require("apisix.upstream")
+local schema_def = require("apisix.schema_def")
+local init       = require("apisix.init")
+local roundrobin = require("resty.roundrobin")
+local ipmatcher  = require("resty.ipmatcher")
+local expr       = require("resty.expr.v1")
+local pairs      = pairs
+local ipairs     = ipairs
+local table_insert = table.insert
+
+local lrucache = core.lrucache.new({
+    ttl = 0, count = 512
+})
+
+
+local vars_schema = {
+    type = "array",
+    items = {
+        type = "array",
+        items = {
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 100
+            },
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 2
+            }
+        },
+        additionalItems = {
+            anyOf = {
+                {type = "string"},
+                {type = "number"},
+                {type = "boolean"},
+                {
+                    type = "array",
+                    items = {
+                        anyOf = {
+                            {
+                            type = "string",
+                            minLength = 1, maxLength = 100
+                            },
+                            {
+                                type = "number"
+                            },
+                            {
+                                type = "boolean"
+                            }
+                        }
+                    },
+                    uniqueItems = true
+                }
+            }
+        },
+        minItems = 0,
+        maxItems = 10
+    }
+}
+
+
+local match_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            vars = vars_schema
+        }
+    },
+    -- When there is no `match` rule, the default rule passes.
+    -- Perform upstream logic of plugin configuration.
+    default = {{ vars = {{"server_port", ">", 0}}}}
+}
+
+
+local upstreams_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            upstream_id = schema_def.id_schema,    -- todo: support 
upstream_id method
+            upstream = schema_def.upstream,
+            weighted_upstream = {
+                description = "used to split traffic between different" ..
+                              "upstreams for plugin configuration",
+                type = "integer",
+                default = 1,
+                minimum = 0
+            }
+        }
+    },
+    -- When the upstream configuration of the plugin is missing,
+    -- the upstream of `route` is used by default.
+    default = {
+        {
+            weighted_upstream = 1
+        }
+    },
+    minItems = 1,
+    maxItems = 20
+}
+
+
+local schema = {
+    type = "object",
+    properties = {
+        rules = {
+            type = "array",
+            items = {
+                type = "object",
+                properties = {
+                    match = match_schema,
+                    upstreams = upstreams_schema
+                }
+            }
+        }
+    }
+}
+
+local plugin_name = "traffic-split"
+
+local _M = {
+    version = 0.1,
+    priority = 966,
+    name = plugin_name,
+    schema = schema
+}
+
+function _M.check_schema(conf)
+    local ok, err = core.schema.check(schema, conf)
+
+    if not ok then
+        return false, err
+    end
+
+    return true
+end
+
+
+local function parse_domain_for_node(node)
+    if not ipmatcher.parse_ipv4(node)
+       and not ipmatcher.parse_ipv6(node)
+    then
+        local ip, err = init.parse_domain(node)
+        if ip then
+            return ip
+        end
+
+        if err then
+            return nil, err
+        end
+    end
+
+    return node
+end
+
+
+local function set_pass_host(ctx, upstream_info, host)
+    -- Currently only supports a single upstream of the domain name.
+    -- When the upstream is `IP`, do not do any `pass_host` operation.
+    if not core.utils.parse_ipv4(host)
+       and not core.utils.parse_ipv6(host)
+    then
+        local pass_host = upstream_info.pass_host or "pass"
+        if pass_host == "pass" then
+            ctx.var.upstream_host = ctx.var.host
+            return
+        end
+
+        if pass_host == "rewrite" then
+            ctx.var.upstream_host = upstream_info.upstream_host
+            return
+        end
+
+        ctx.var.upstream_host = host
+        return
+    end
+
+    return
+end
+
+
+local function set_upstream(upstream_info, ctx)
+    local nodes = upstream_info.nodes
+    local new_nodes = {}
+    if core.table.isarray(nodes) then
+        for _, node in ipairs(nodes) do
+            set_pass_host(ctx, upstream_info, node.host)
+            node.host = parse_domain_for_node(node.host)
+            node.port = node.port
+            node.weight = node.weight
+            table_insert(new_nodes, node)
+        end
+    else
+        for addr, weight in pairs(nodes) do
+            local node = {}
+            local ip, port, host
+            host, port = core.utils.parse_addr(addr)
+            set_pass_host(ctx, upstream_info, host)
+            ip = parse_domain_for_node(host)
+            node.host = ip
+            node.port = port
+            node.weight = weight
+            table_insert(new_nodes, node)
+        end
+    end
+    core.log.info("upstream_host: ", ctx.var.upstream_host)
+
+    local up_conf = {
+        name = upstream_info.name,
+        type = upstream_info.type,
+        nodes = new_nodes,
+        timeout = {
+            send = upstream_info.timeout and upstream_info.timeout.send or 15,
+            read = upstream_info.timeout and upstream_info.timeout.read or 15,
+            connect = upstream_info.timeout and upstream_info.timeout.connect 
or 15
+        }
+    }
+
+    local ok, err = upstream.check_schema(up_conf)
+    if not ok then
+        return 500, err
+    end
+
+    local matched_route = ctx.matched_route
+    upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
+                ctx.conf_version, up_conf, matched_route)
+    return
+end
+
+
+local function new_rr_obj(upstreams)
+    local server_list = {}
+    for _, upstream_obj in ipairs(upstreams) do
+        if not upstream_obj.upstream then
+            -- If the `upstream` object has only the `weighted_upstream` 
value, it means
+            -- that the `upstream` weight value on the default `route` has 
been reached.
+            -- Need to set an identifier to mark the empty upstream.
+            upstream_obj.upstream = "empty_upstream"
+        end
+        server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream
+    end
+
+    return roundrobin:new(server_list)
+end
+
+
+function _M.access(conf, ctx)
+    if not conf or not conf.rules then
+        return
+    end
+
+    local upstreams, match_flag
+    for _, rule in pairs(conf.rules) do
+        match_flag = true
+        for _, single_match in ipairs(rule.match) do
+            local expr, err = expr.new(single_match.vars)
+            if err then
+                return 500, err
+            end
+
+            match_flag = expr:eval()
+            if match_flag then
+                break
+            end
+        end
+
+        if match_flag then
+            upstreams = rule.upstreams
+            break
+        end
+    end
+
+    core.log.info("match_flag: ", match_flag)
+
+    if not match_flag then
+        return
+    end
+
+    local rr_up, err = lrucache(upstreams, nil, new_rr_obj, upstreams)
+    if not rr_up then
+        core.log.error("lrucache roundrobin failed: ", err)
+        return 500
+    end
+
+    local upstream = rr_up:find()

Review comment:
       Do not use wrr here, instead, please use weighted random algorithm.
   
   Say we have two upstreams and the weight of them are same,  when rr state is 
just reset, the first selected upstream is fixed, depending on their order, 
this is the inherent drawback of wrr. it might overwhelm the first selected 
upstream after the wrr state is reset.
   
   Also, wrr state will be reset once config is changd, the percentage curve is 
not so smooth.

##########
File path: doc/README.md
##########
@@ -81,6 +81,7 @@
 * [request-validation](plugins/request-validation.md): Validates requests 
before forwarding to upstream.
 * [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror 
client requests.
 * [api-breaker](plugins/api-breaker.md): Circuit Breaker for API that stops 
requests forwarding to upstream in case of unhealthy state.
+* [traffic-split](plugins/traffic-split.md): The traffic division plug-in 
divides the request traffic according to the specified ratio and diverts it to 
the corresponding upstream; through this plug-in, gray-scale publishing, 
blue-green publishing and custom publishing functions can be realized.

Review comment:
       Why traffic division?

##########
File path: apisix/plugins/traffic-split.lua
##########
@@ -0,0 +1,314 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core       = require("apisix.core")
+local upstream   = require("apisix.upstream")
+local schema_def = require("apisix.schema_def")
+local init       = require("apisix.init")
+local roundrobin = require("resty.roundrobin")
+local ipmatcher  = require("resty.ipmatcher")
+local expr       = require("resty.expr.v1")
+local pairs      = pairs
+local ipairs     = ipairs
+local table_insert = table.insert
+
+local lrucache = core.lrucache.new({
+    ttl = 0, count = 512
+})
+
+
+local vars_schema = {
+    type = "array",
+    items = {
+        type = "array",
+        items = {
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 100
+            },
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 2
+            }
+        },
+        additionalItems = {
+            anyOf = {
+                {type = "string"},
+                {type = "number"},
+                {type = "boolean"},
+                {
+                    type = "array",
+                    items = {
+                        anyOf = {
+                            {
+                            type = "string",
+                            minLength = 1, maxLength = 100
+                            },
+                            {
+                                type = "number"
+                            },
+                            {
+                                type = "boolean"
+                            }
+                        }
+                    },
+                    uniqueItems = true
+                }
+            }
+        },
+        minItems = 0,
+        maxItems = 10
+    }
+}
+
+
+local match_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            vars = vars_schema
+        }
+    },
+    -- When there is no `match` rule, the default rule passes.
+    -- Perform upstream logic of plugin configuration.
+    default = {{ vars = {{"server_port", ">", 0}}}}
+}
+
+
+local upstreams_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            upstream_id = schema_def.id_schema,    -- todo: support 
upstream_id method
+            upstream = schema_def.upstream,
+            weighted_upstream = {
+                description = "used to split traffic between different" ..
+                              "upstreams for plugin configuration",
+                type = "integer",
+                default = 1,
+                minimum = 0
+            }
+        }
+    },
+    -- When the upstream configuration of the plugin is missing,
+    -- the upstream of `route` is used by default.
+    default = {
+        {
+            weighted_upstream = 1
+        }
+    },
+    minItems = 1,
+    maxItems = 20
+}
+
+
+local schema = {
+    type = "object",
+    properties = {
+        rules = {
+            type = "array",
+            items = {
+                type = "object",
+                properties = {
+                    match = match_schema,
+                    upstreams = upstreams_schema
+                }
+            }
+        }
+    }
+}
+
+local plugin_name = "traffic-split"
+
+local _M = {
+    version = 0.1,
+    priority = 966,
+    name = plugin_name,
+    schema = schema
+}
+
+function _M.check_schema(conf)
+    local ok, err = core.schema.check(schema, conf)
+
+    if not ok then
+        return false, err
+    end
+
+    return true
+end
+
+
+local function parse_domain_for_node(node)
+    if not ipmatcher.parse_ipv4(node)
+       and not ipmatcher.parse_ipv6(node)
+    then
+        local ip, err = init.parse_domain(node)
+        if ip then
+            return ip
+        end
+
+        if err then
+            return nil, err
+        end
+    end
+
+    return node
+end
+
+
+local function set_pass_host(ctx, upstream_info, host)
+    -- Currently only supports a single upstream of the domain name.
+    -- When the upstream is `IP`, do not do any `pass_host` operation.
+    if not core.utils.parse_ipv4(host)
+       and not core.utils.parse_ipv6(host)
+    then
+        local pass_host = upstream_info.pass_host or "pass"
+        if pass_host == "pass" then
+            ctx.var.upstream_host = ctx.var.host
+            return
+        end
+
+        if pass_host == "rewrite" then
+            ctx.var.upstream_host = upstream_info.upstream_host
+            return
+        end
+
+        ctx.var.upstream_host = host
+        return
+    end
+
+    return
+end
+
+
+local function set_upstream(upstream_info, ctx)
+    local nodes = upstream_info.nodes
+    local new_nodes = {}
+    if core.table.isarray(nodes) then
+        for _, node in ipairs(nodes) do
+            set_pass_host(ctx, upstream_info, node.host)
+            node.host = parse_domain_for_node(node.host)
+            node.port = node.port
+            node.weight = node.weight
+            table_insert(new_nodes, node)
+        end
+    else
+        for addr, weight in pairs(nodes) do
+            local node = {}
+            local ip, port, host
+            host, port = core.utils.parse_addr(addr)
+            set_pass_host(ctx, upstream_info, host)
+            ip = parse_domain_for_node(host)
+            node.host = ip
+            node.port = port
+            node.weight = weight
+            table_insert(new_nodes, node)
+        end
+    end
+    core.log.info("upstream_host: ", ctx.var.upstream_host)
+
+    local up_conf = {
+        name = upstream_info.name,
+        type = upstream_info.type,
+        nodes = new_nodes,
+        timeout = {
+            send = upstream_info.timeout and upstream_info.timeout.send or 15,
+            read = upstream_info.timeout and upstream_info.timeout.read or 15,
+            connect = upstream_info.timeout and upstream_info.timeout.connect 
or 15
+        }
+    }
+
+    local ok, err = upstream.check_schema(up_conf)
+    if not ok then
+        return 500, err
+    end
+
+    local matched_route = ctx.matched_route
+    upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
+                ctx.conf_version, up_conf, matched_route)
+    return
+end
+
+
+local function new_rr_obj(upstreams)
+    local server_list = {}
+    for _, upstream_obj in ipairs(upstreams) do
+        if not upstream_obj.upstream then
+            -- If the `upstream` object has only the `weighted_upstream` 
value, it means
+            -- that the `upstream` weight value on the default `route` has 
been reached.
+            -- Need to set an identifier to mark the empty upstream.
+            upstream_obj.upstream = "empty_upstream"
+        end
+        server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream
+    end
+
+    return roundrobin:new(server_list)
+end
+
+
+function _M.access(conf, ctx)
+    if not conf or not conf.rules then
+        return
+    end
+
+    local upstreams, match_flag
+    for _, rule in pairs(conf.rules) do
+        match_flag = true
+        for _, single_match in ipairs(rule.match) do
+            local expr, err = expr.new(single_match.vars)
+            if err then
+                return 500, err
+            end
+
+            match_flag = expr:eval()
+            if match_flag then

Review comment:
       Why we can break when just the current match is true, we should check 
all matchs.

##########
File path: apisix/plugins/traffic-split.lua
##########
@@ -0,0 +1,314 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core       = require("apisix.core")
+local upstream   = require("apisix.upstream")
+local schema_def = require("apisix.schema_def")
+local init       = require("apisix.init")
+local roundrobin = require("resty.roundrobin")
+local ipmatcher  = require("resty.ipmatcher")
+local expr       = require("resty.expr.v1")
+local pairs      = pairs
+local ipairs     = ipairs
+local table_insert = table.insert
+
+local lrucache = core.lrucache.new({
+    ttl = 0, count = 512
+})
+
+
+local vars_schema = {
+    type = "array",
+    items = {
+        type = "array",
+        items = {
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 100
+            },
+            {
+                type = "string",
+                minLength = 1,
+                maxLength = 2
+            }
+        },
+        additionalItems = {
+            anyOf = {
+                {type = "string"},
+                {type = "number"},
+                {type = "boolean"},
+                {
+                    type = "array",
+                    items = {
+                        anyOf = {
+                            {
+                            type = "string",
+                            minLength = 1, maxLength = 100
+                            },
+                            {
+                                type = "number"
+                            },
+                            {
+                                type = "boolean"
+                            }
+                        }
+                    },
+                    uniqueItems = true
+                }
+            }
+        },
+        minItems = 0,
+        maxItems = 10
+    }
+}
+
+
+local match_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            vars = vars_schema
+        }
+    },
+    -- When there is no `match` rule, the default rule passes.
+    -- Perform upstream logic of plugin configuration.
+    default = {{ vars = {{"server_port", ">", 0}}}}
+}
+
+
+local upstreams_schema = {
+    type = "array",
+    items = {
+        type = "object",
+        properties = {
+            upstream_id = schema_def.id_schema,    -- todo: support 
upstream_id method
+            upstream = schema_def.upstream,
+            weighted_upstream = {
+                description = "used to split traffic between different" ..
+                              "upstreams for plugin configuration",
+                type = "integer",
+                default = 1,
+                minimum = 0
+            }
+        }
+    },
+    -- When the upstream configuration of the plugin is missing,
+    -- the upstream of `route` is used by default.
+    default = {
+        {
+            weighted_upstream = 1
+        }
+    },
+    minItems = 1,
+    maxItems = 20
+}
+
+
+local schema = {
+    type = "object",
+    properties = {
+        rules = {
+            type = "array",
+            items = {
+                type = "object",
+                properties = {
+                    match = match_schema,
+                    upstreams = upstreams_schema
+                }
+            }
+        }
+    }
+}
+
+local plugin_name = "traffic-split"
+
+local _M = {
+    version = 0.1,
+    priority = 966,
+    name = plugin_name,
+    schema = schema
+}
+
+function _M.check_schema(conf)
+    local ok, err = core.schema.check(schema, conf)
+
+    if not ok then
+        return false, err
+    end
+
+    return true
+end
+
+
+local function parse_domain_for_node(node)
+    if not ipmatcher.parse_ipv4(node)
+       and not ipmatcher.parse_ipv6(node)
+    then
+        local ip, err = init.parse_domain(node)
+        if ip then
+            return ip
+        end
+
+        if err then
+            return nil, err
+        end
+    end
+
+    return node
+end
+
+
+local function set_pass_host(ctx, upstream_info, host)
+    -- Currently only supports a single upstream of the domain name.
+    -- When the upstream is `IP`, do not do any `pass_host` operation.
+    if not core.utils.parse_ipv4(host)
+       and not core.utils.parse_ipv6(host)
+    then
+        local pass_host = upstream_info.pass_host or "pass"
+        if pass_host == "pass" then
+            ctx.var.upstream_host = ctx.var.host
+            return
+        end
+
+        if pass_host == "rewrite" then
+            ctx.var.upstream_host = upstream_info.upstream_host
+            return
+        end
+
+        ctx.var.upstream_host = host
+        return
+    end
+
+    return
+end
+
+
+local function set_upstream(upstream_info, ctx)
+    local nodes = upstream_info.nodes
+    local new_nodes = {}
+    if core.table.isarray(nodes) then
+        for _, node in ipairs(nodes) do
+            set_pass_host(ctx, upstream_info, node.host)
+            node.host = parse_domain_for_node(node.host)
+            node.port = node.port
+            node.weight = node.weight
+            table_insert(new_nodes, node)
+        end
+    else
+        for addr, weight in pairs(nodes) do
+            local node = {}
+            local ip, port, host
+            host, port = core.utils.parse_addr(addr)
+            set_pass_host(ctx, upstream_info, host)
+            ip = parse_domain_for_node(host)
+            node.host = ip
+            node.port = port
+            node.weight = weight
+            table_insert(new_nodes, node)
+        end
+    end
+    core.log.info("upstream_host: ", ctx.var.upstream_host)
+
+    local up_conf = {
+        name = upstream_info.name,
+        type = upstream_info.type,
+        nodes = new_nodes,
+        timeout = {
+            send = upstream_info.timeout and upstream_info.timeout.send or 15,
+            read = upstream_info.timeout and upstream_info.timeout.read or 15,
+            connect = upstream_info.timeout and upstream_info.timeout.connect 
or 15
+        }
+    }
+
+    local ok, err = upstream.check_schema(up_conf)
+    if not ok then
+        return 500, err
+    end
+
+    local matched_route = ctx.matched_route
+    upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
+                ctx.conf_version, up_conf, matched_route)
+    return
+end
+
+
+local function new_rr_obj(upstreams)
+    local server_list = {}
+    for _, upstream_obj in ipairs(upstreams) do
+        if not upstream_obj.upstream then
+            -- If the `upstream` object has only the `weighted_upstream` 
value, it means
+            -- that the `upstream` weight value on the default `route` has 
been reached.
+            -- Need to set an identifier to mark the empty upstream.
+            upstream_obj.upstream = "empty_upstream"
+        end
+        server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream
+    end
+
+    return roundrobin:new(server_list)
+end
+
+
+function _M.access(conf, ctx)
+    if not conf or not conf.rules then
+        return
+    end
+
+    local upstreams, match_flag
+    for _, rule in pairs(conf.rules) do
+        match_flag = true
+        for _, single_match in ipairs(rule.match) do
+            local expr, err = expr.new(single_match.vars)
+            if err then

Review comment:
       We should log this error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to