tokers commented on a change in pull request #2935: URL: https://github.com/apache/apisix/pull/2935#discussion_r547914974
########## File path: apisix/plugins/traffic-split.lua ########## @@ -0,0 +1,314 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local upstream = require("apisix.upstream") +local schema_def = require("apisix.schema_def") +local init = require("apisix.init") +local roundrobin = require("resty.roundrobin") +local ipmatcher = require("resty.ipmatcher") +local expr = require("resty.expr.v1") +local pairs = pairs +local ipairs = ipairs +local table_insert = table.insert + +local lrucache = core.lrucache.new({ + ttl = 0, count = 512 +}) + + +local vars_schema = { + type = "array", + items = { + type = "array", + items = { + { + type = "string", + minLength = 1, + maxLength = 100 + }, + { + type = "string", + minLength = 1, + maxLength = 2 + } + }, + additionalItems = { + anyOf = { + {type = "string"}, + {type = "number"}, + {type = "boolean"}, + { + type = "array", + items = { + anyOf = { + { + type = "string", + minLength = 1, maxLength = 100 + }, + { + type = "number" + }, + { + type = "boolean" + } + } + }, + uniqueItems = true + } + } + }, + minItems = 0, + maxItems = 10 + } +} + + +local match_schema = { + type = "array", + items = { + type = "object", + properties = { + vars = vars_schema + } + }, + -- When there is no `match` rule, the default rule passes. + -- Perform upstream logic of plugin configuration. + default = {{ vars = {{"server_port", ">", 0}}}} +} + + +local upstreams_schema = { + type = "array", + items = { + type = "object", + properties = { + upstream_id = schema_def.id_schema, -- todo: support upstream_id method + upstream = schema_def.upstream, + weighted_upstream = { + description = "used to split traffic between different" .. + "upstreams for plugin configuration", + type = "integer", + default = 1, + minimum = 0 + } + } + }, + -- When the upstream configuration of the plugin is missing, + -- the upstream of `route` is used by default. + default = { + { + weighted_upstream = 1 + } + }, + minItems = 1, + maxItems = 20 +} + + +local schema = { + type = "object", + properties = { + rules = { + type = "array", + items = { + type = "object", + properties = { + match = match_schema, + upstreams = upstreams_schema + } + } + } + } +} + +local plugin_name = "traffic-split" + +local _M = { + version = 0.1, + priority = 966, + name = plugin_name, + schema = schema +} + +function _M.check_schema(conf) + local ok, err = core.schema.check(schema, conf) + + if not ok then + return false, err + end + + return true +end + + +local function parse_domain_for_node(node) + if not ipmatcher.parse_ipv4(node) + and not ipmatcher.parse_ipv6(node) + then + local ip, err = init.parse_domain(node) + if ip then + return ip + end + + if err then + return nil, err + end + end + + return node +end + + +local function set_pass_host(ctx, upstream_info, host) + -- Currently only supports a single upstream of the domain name. + -- When the upstream is `IP`, do not do any `pass_host` operation. + if not core.utils.parse_ipv4(host) + and not core.utils.parse_ipv6(host) + then + local pass_host = upstream_info.pass_host or "pass" + if pass_host == "pass" then + ctx.var.upstream_host = ctx.var.host + return + end + + if pass_host == "rewrite" then + ctx.var.upstream_host = upstream_info.upstream_host + return + end + + ctx.var.upstream_host = host + return + end + + return +end + + +local function set_upstream(upstream_info, ctx) + local nodes = upstream_info.nodes + local new_nodes = {} + if core.table.isarray(nodes) then + for _, node in ipairs(nodes) do + set_pass_host(ctx, upstream_info, node.host) + node.host = parse_domain_for_node(node.host) + node.port = node.port + node.weight = node.weight + table_insert(new_nodes, node) + end + else + for addr, weight in pairs(nodes) do + local node = {} + local ip, port, host + host, port = core.utils.parse_addr(addr) + set_pass_host(ctx, upstream_info, host) + ip = parse_domain_for_node(host) + node.host = ip + node.port = port + node.weight = weight + table_insert(new_nodes, node) + end + end + core.log.info("upstream_host: ", ctx.var.upstream_host) + + local up_conf = { + name = upstream_info.name, + type = upstream_info.type, + nodes = new_nodes, + timeout = { + send = upstream_info.timeout and upstream_info.timeout.send or 15, + read = upstream_info.timeout and upstream_info.timeout.read or 15, + connect = upstream_info.timeout and upstream_info.timeout.connect or 15 + } + } + + local ok, err = upstream.check_schema(up_conf) + if not ok then + return 500, err + end + + local matched_route = ctx.matched_route + upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id, + ctx.conf_version, up_conf, matched_route) + return +end + + +local function new_rr_obj(upstreams) + local server_list = {} + for _, upstream_obj in ipairs(upstreams) do + if not upstream_obj.upstream then + -- If the `upstream` object has only the `weighted_upstream` value, it means + -- that the `upstream` weight value on the default `route` has been reached. + -- Need to set an identifier to mark the empty upstream. + upstream_obj.upstream = "empty_upstream" + end + server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream + end + + return roundrobin:new(server_list) +end + + +function _M.access(conf, ctx) + if not conf or not conf.rules then + return + end + + local upstreams, match_flag + for _, rule in pairs(conf.rules) do Review comment: For array-like table, use ipairs. ########## File path: README.md ########## @@ -99,6 +99,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against - [Health Checks](doc/health-check.md): Enable health check on the upstream node, and will automatically filter unhealthy nodes during load balancing to ensure system stability. - Circuit-Breaker: Intelligent tracking of unhealthy upstream services. - [Proxy Mirror](doc/plugins/proxy-mirror.md): Provides the ability to mirror client requests. + - [Traffic Split](doc/plugins/traffic-split.md): Support the functions of gray release, blue-green release and custom release. Review comment: Not so intuitive. Use: allows users to incrementally direct percentages of traffic between various upstreams. ########## File path: README_CN.md ########## @@ -97,6 +97,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵 - [健康检查](doc/zh-cn/health-check.md):启用上游节点的健康检查,将在负载均衡期间自动过滤不健康的节点,以确保系统稳定性。 - 熔断器: 智能跟踪不健康上游服务。 - [代理镜像](doc/zh-cn/plugins/proxy-mirror.md): 提供镜像客户端请求的能力。 + - [流量拆分](doc/zh-cn/plugins/traffic-split.md): 支持灰度发布、蓝绿发布和自定义发布的功能。 Review comment: Ditto ########## File path: apisix/plugins/traffic-split.lua ########## @@ -0,0 +1,314 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local upstream = require("apisix.upstream") +local schema_def = require("apisix.schema_def") +local init = require("apisix.init") +local roundrobin = require("resty.roundrobin") +local ipmatcher = require("resty.ipmatcher") +local expr = require("resty.expr.v1") +local pairs = pairs +local ipairs = ipairs +local table_insert = table.insert + +local lrucache = core.lrucache.new({ + ttl = 0, count = 512 +}) + + +local vars_schema = { + type = "array", + items = { + type = "array", + items = { + { + type = "string", + minLength = 1, + maxLength = 100 + }, + { + type = "string", + minLength = 1, + maxLength = 2 + } + }, + additionalItems = { + anyOf = { + {type = "string"}, + {type = "number"}, + {type = "boolean"}, + { + type = "array", + items = { + anyOf = { + { + type = "string", + minLength = 1, maxLength = 100 + }, + { + type = "number" + }, + { + type = "boolean" + } + } + }, + uniqueItems = true + } + } + }, + minItems = 0, + maxItems = 10 + } +} + + +local match_schema = { + type = "array", + items = { + type = "object", + properties = { + vars = vars_schema + } + }, + -- When there is no `match` rule, the default rule passes. + -- Perform upstream logic of plugin configuration. + default = {{ vars = {{"server_port", ">", 0}}}} +} + + +local upstreams_schema = { + type = "array", + items = { + type = "object", + properties = { + upstream_id = schema_def.id_schema, -- todo: support upstream_id method + upstream = schema_def.upstream, + weighted_upstream = { + description = "used to split traffic between different" .. + "upstreams for plugin configuration", + type = "integer", + default = 1, + minimum = 0 + } + } + }, + -- When the upstream configuration of the plugin is missing, + -- the upstream of `route` is used by default. + default = { + { + weighted_upstream = 1 + } + }, + minItems = 1, + maxItems = 20 +} + + +local schema = { + type = "object", + properties = { + rules = { + type = "array", + items = { + type = "object", + properties = { + match = match_schema, + upstreams = upstreams_schema + } + } + } + } +} + +local plugin_name = "traffic-split" + +local _M = { + version = 0.1, + priority = 966, + name = plugin_name, + schema = schema +} + +function _M.check_schema(conf) + local ok, err = core.schema.check(schema, conf) + + if not ok then + return false, err + end + + return true +end + + +local function parse_domain_for_node(node) + if not ipmatcher.parse_ipv4(node) + and not ipmatcher.parse_ipv6(node) + then + local ip, err = init.parse_domain(node) + if ip then + return ip + end + + if err then + return nil, err + end + end + + return node +end + + +local function set_pass_host(ctx, upstream_info, host) + -- Currently only supports a single upstream of the domain name. + -- When the upstream is `IP`, do not do any `pass_host` operation. + if not core.utils.parse_ipv4(host) + and not core.utils.parse_ipv6(host) + then + local pass_host = upstream_info.pass_host or "pass" + if pass_host == "pass" then + ctx.var.upstream_host = ctx.var.host + return + end + + if pass_host == "rewrite" then + ctx.var.upstream_host = upstream_info.upstream_host + return + end + + ctx.var.upstream_host = host + return + end + + return +end + + +local function set_upstream(upstream_info, ctx) + local nodes = upstream_info.nodes + local new_nodes = {} + if core.table.isarray(nodes) then + for _, node in ipairs(nodes) do + set_pass_host(ctx, upstream_info, node.host) + node.host = parse_domain_for_node(node.host) + node.port = node.port + node.weight = node.weight + table_insert(new_nodes, node) + end + else + for addr, weight in pairs(nodes) do + local node = {} + local ip, port, host + host, port = core.utils.parse_addr(addr) + set_pass_host(ctx, upstream_info, host) + ip = parse_domain_for_node(host) + node.host = ip + node.port = port + node.weight = weight + table_insert(new_nodes, node) + end + end + core.log.info("upstream_host: ", ctx.var.upstream_host) + + local up_conf = { + name = upstream_info.name, + type = upstream_info.type, + nodes = new_nodes, + timeout = { + send = upstream_info.timeout and upstream_info.timeout.send or 15, + read = upstream_info.timeout and upstream_info.timeout.read or 15, + connect = upstream_info.timeout and upstream_info.timeout.connect or 15 + } + } + + local ok, err = upstream.check_schema(up_conf) + if not ok then + return 500, err + end + + local matched_route = ctx.matched_route + upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id, + ctx.conf_version, up_conf, matched_route) + return +end + + +local function new_rr_obj(upstreams) + local server_list = {} + for _, upstream_obj in ipairs(upstreams) do + if not upstream_obj.upstream then + -- If the `upstream` object has only the `weighted_upstream` value, it means + -- that the `upstream` weight value on the default `route` has been reached. + -- Need to set an identifier to mark the empty upstream. + upstream_obj.upstream = "empty_upstream" + end + server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream + end + + return roundrobin:new(server_list) +end + + +function _M.access(conf, ctx) + if not conf or not conf.rules then + return + end + + local upstreams, match_flag + for _, rule in pairs(conf.rules) do + match_flag = true + for _, single_match in ipairs(rule.match) do + local expr, err = expr.new(single_match.vars) + if err then + return 500, err + end + + match_flag = expr:eval() + if match_flag then + break + end + end + + if match_flag then + upstreams = rule.upstreams + break + end + end + + core.log.info("match_flag: ", match_flag) + + if not match_flag then + return + end + + local rr_up, err = lrucache(upstreams, nil, new_rr_obj, upstreams) + if not rr_up then + core.log.error("lrucache roundrobin failed: ", err) + return 500 + end + + local upstream = rr_up:find() Review comment: Do not use wrr here, instead, please use weighted random algorithm. Say we have two upstreams and the weight of them are same, when rr state is just reset, the first selected upstream is fixed, depending on their order, this is the inherent drawback of wrr. it might overwhelm the first selected upstream after the wrr state is reset. Also, wrr state will be reset once config is changd, the percentage curve is not so smooth. ########## File path: doc/README.md ########## @@ -81,6 +81,7 @@ * [request-validation](plugins/request-validation.md): Validates requests before forwarding to upstream. * [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror client requests. * [api-breaker](plugins/api-breaker.md): Circuit Breaker for API that stops requests forwarding to upstream in case of unhealthy state. +* [traffic-split](plugins/traffic-split.md): The traffic division plug-in divides the request traffic according to the specified ratio and diverts it to the corresponding upstream; through this plug-in, gray-scale publishing, blue-green publishing and custom publishing functions can be realized. Review comment: Why traffic division? ########## File path: apisix/plugins/traffic-split.lua ########## @@ -0,0 +1,314 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local upstream = require("apisix.upstream") +local schema_def = require("apisix.schema_def") +local init = require("apisix.init") +local roundrobin = require("resty.roundrobin") +local ipmatcher = require("resty.ipmatcher") +local expr = require("resty.expr.v1") +local pairs = pairs +local ipairs = ipairs +local table_insert = table.insert + +local lrucache = core.lrucache.new({ + ttl = 0, count = 512 +}) + + +local vars_schema = { + type = "array", + items = { + type = "array", + items = { + { + type = "string", + minLength = 1, + maxLength = 100 + }, + { + type = "string", + minLength = 1, + maxLength = 2 + } + }, + additionalItems = { + anyOf = { + {type = "string"}, + {type = "number"}, + {type = "boolean"}, + { + type = "array", + items = { + anyOf = { + { + type = "string", + minLength = 1, maxLength = 100 + }, + { + type = "number" + }, + { + type = "boolean" + } + } + }, + uniqueItems = true + } + } + }, + minItems = 0, + maxItems = 10 + } +} + + +local match_schema = { + type = "array", + items = { + type = "object", + properties = { + vars = vars_schema + } + }, + -- When there is no `match` rule, the default rule passes. + -- Perform upstream logic of plugin configuration. + default = {{ vars = {{"server_port", ">", 0}}}} +} + + +local upstreams_schema = { + type = "array", + items = { + type = "object", + properties = { + upstream_id = schema_def.id_schema, -- todo: support upstream_id method + upstream = schema_def.upstream, + weighted_upstream = { + description = "used to split traffic between different" .. + "upstreams for plugin configuration", + type = "integer", + default = 1, + minimum = 0 + } + } + }, + -- When the upstream configuration of the plugin is missing, + -- the upstream of `route` is used by default. + default = { + { + weighted_upstream = 1 + } + }, + minItems = 1, + maxItems = 20 +} + + +local schema = { + type = "object", + properties = { + rules = { + type = "array", + items = { + type = "object", + properties = { + match = match_schema, + upstreams = upstreams_schema + } + } + } + } +} + +local plugin_name = "traffic-split" + +local _M = { + version = 0.1, + priority = 966, + name = plugin_name, + schema = schema +} + +function _M.check_schema(conf) + local ok, err = core.schema.check(schema, conf) + + if not ok then + return false, err + end + + return true +end + + +local function parse_domain_for_node(node) + if not ipmatcher.parse_ipv4(node) + and not ipmatcher.parse_ipv6(node) + then + local ip, err = init.parse_domain(node) + if ip then + return ip + end + + if err then + return nil, err + end + end + + return node +end + + +local function set_pass_host(ctx, upstream_info, host) + -- Currently only supports a single upstream of the domain name. + -- When the upstream is `IP`, do not do any `pass_host` operation. + if not core.utils.parse_ipv4(host) + and not core.utils.parse_ipv6(host) + then + local pass_host = upstream_info.pass_host or "pass" + if pass_host == "pass" then + ctx.var.upstream_host = ctx.var.host + return + end + + if pass_host == "rewrite" then + ctx.var.upstream_host = upstream_info.upstream_host + return + end + + ctx.var.upstream_host = host + return + end + + return +end + + +local function set_upstream(upstream_info, ctx) + local nodes = upstream_info.nodes + local new_nodes = {} + if core.table.isarray(nodes) then + for _, node in ipairs(nodes) do + set_pass_host(ctx, upstream_info, node.host) + node.host = parse_domain_for_node(node.host) + node.port = node.port + node.weight = node.weight + table_insert(new_nodes, node) + end + else + for addr, weight in pairs(nodes) do + local node = {} + local ip, port, host + host, port = core.utils.parse_addr(addr) + set_pass_host(ctx, upstream_info, host) + ip = parse_domain_for_node(host) + node.host = ip + node.port = port + node.weight = weight + table_insert(new_nodes, node) + end + end + core.log.info("upstream_host: ", ctx.var.upstream_host) + + local up_conf = { + name = upstream_info.name, + type = upstream_info.type, + nodes = new_nodes, + timeout = { + send = upstream_info.timeout and upstream_info.timeout.send or 15, + read = upstream_info.timeout and upstream_info.timeout.read or 15, + connect = upstream_info.timeout and upstream_info.timeout.connect or 15 + } + } + + local ok, err = upstream.check_schema(up_conf) + if not ok then + return 500, err + end + + local matched_route = ctx.matched_route + upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id, + ctx.conf_version, up_conf, matched_route) + return +end + + +local function new_rr_obj(upstreams) + local server_list = {} + for _, upstream_obj in ipairs(upstreams) do + if not upstream_obj.upstream then + -- If the `upstream` object has only the `weighted_upstream` value, it means + -- that the `upstream` weight value on the default `route` has been reached. + -- Need to set an identifier to mark the empty upstream. + upstream_obj.upstream = "empty_upstream" + end + server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream + end + + return roundrobin:new(server_list) +end + + +function _M.access(conf, ctx) + if not conf or not conf.rules then + return + end + + local upstreams, match_flag + for _, rule in pairs(conf.rules) do + match_flag = true + for _, single_match in ipairs(rule.match) do + local expr, err = expr.new(single_match.vars) + if err then + return 500, err + end + + match_flag = expr:eval() + if match_flag then Review comment: Why we can break when just the current match is true, we should check all matchs. ########## File path: apisix/plugins/traffic-split.lua ########## @@ -0,0 +1,314 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local upstream = require("apisix.upstream") +local schema_def = require("apisix.schema_def") +local init = require("apisix.init") +local roundrobin = require("resty.roundrobin") +local ipmatcher = require("resty.ipmatcher") +local expr = require("resty.expr.v1") +local pairs = pairs +local ipairs = ipairs +local table_insert = table.insert + +local lrucache = core.lrucache.new({ + ttl = 0, count = 512 +}) + + +local vars_schema = { + type = "array", + items = { + type = "array", + items = { + { + type = "string", + minLength = 1, + maxLength = 100 + }, + { + type = "string", + minLength = 1, + maxLength = 2 + } + }, + additionalItems = { + anyOf = { + {type = "string"}, + {type = "number"}, + {type = "boolean"}, + { + type = "array", + items = { + anyOf = { + { + type = "string", + minLength = 1, maxLength = 100 + }, + { + type = "number" + }, + { + type = "boolean" + } + } + }, + uniqueItems = true + } + } + }, + minItems = 0, + maxItems = 10 + } +} + + +local match_schema = { + type = "array", + items = { + type = "object", + properties = { + vars = vars_schema + } + }, + -- When there is no `match` rule, the default rule passes. + -- Perform upstream logic of plugin configuration. + default = {{ vars = {{"server_port", ">", 0}}}} +} + + +local upstreams_schema = { + type = "array", + items = { + type = "object", + properties = { + upstream_id = schema_def.id_schema, -- todo: support upstream_id method + upstream = schema_def.upstream, + weighted_upstream = { + description = "used to split traffic between different" .. + "upstreams for plugin configuration", + type = "integer", + default = 1, + minimum = 0 + } + } + }, + -- When the upstream configuration of the plugin is missing, + -- the upstream of `route` is used by default. + default = { + { + weighted_upstream = 1 + } + }, + minItems = 1, + maxItems = 20 +} + + +local schema = { + type = "object", + properties = { + rules = { + type = "array", + items = { + type = "object", + properties = { + match = match_schema, + upstreams = upstreams_schema + } + } + } + } +} + +local plugin_name = "traffic-split" + +local _M = { + version = 0.1, + priority = 966, + name = plugin_name, + schema = schema +} + +function _M.check_schema(conf) + local ok, err = core.schema.check(schema, conf) + + if not ok then + return false, err + end + + return true +end + + +local function parse_domain_for_node(node) + if not ipmatcher.parse_ipv4(node) + and not ipmatcher.parse_ipv6(node) + then + local ip, err = init.parse_domain(node) + if ip then + return ip + end + + if err then + return nil, err + end + end + + return node +end + + +local function set_pass_host(ctx, upstream_info, host) + -- Currently only supports a single upstream of the domain name. + -- When the upstream is `IP`, do not do any `pass_host` operation. + if not core.utils.parse_ipv4(host) + and not core.utils.parse_ipv6(host) + then + local pass_host = upstream_info.pass_host or "pass" + if pass_host == "pass" then + ctx.var.upstream_host = ctx.var.host + return + end + + if pass_host == "rewrite" then + ctx.var.upstream_host = upstream_info.upstream_host + return + end + + ctx.var.upstream_host = host + return + end + + return +end + + +local function set_upstream(upstream_info, ctx) + local nodes = upstream_info.nodes + local new_nodes = {} + if core.table.isarray(nodes) then + for _, node in ipairs(nodes) do + set_pass_host(ctx, upstream_info, node.host) + node.host = parse_domain_for_node(node.host) + node.port = node.port + node.weight = node.weight + table_insert(new_nodes, node) + end + else + for addr, weight in pairs(nodes) do + local node = {} + local ip, port, host + host, port = core.utils.parse_addr(addr) + set_pass_host(ctx, upstream_info, host) + ip = parse_domain_for_node(host) + node.host = ip + node.port = port + node.weight = weight + table_insert(new_nodes, node) + end + end + core.log.info("upstream_host: ", ctx.var.upstream_host) + + local up_conf = { + name = upstream_info.name, + type = upstream_info.type, + nodes = new_nodes, + timeout = { + send = upstream_info.timeout and upstream_info.timeout.send or 15, + read = upstream_info.timeout and upstream_info.timeout.read or 15, + connect = upstream_info.timeout and upstream_info.timeout.connect or 15 + } + } + + local ok, err = upstream.check_schema(up_conf) + if not ok then + return 500, err + end + + local matched_route = ctx.matched_route + upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id, + ctx.conf_version, up_conf, matched_route) + return +end + + +local function new_rr_obj(upstreams) + local server_list = {} + for _, upstream_obj in ipairs(upstreams) do + if not upstream_obj.upstream then + -- If the `upstream` object has only the `weighted_upstream` value, it means + -- that the `upstream` weight value on the default `route` has been reached. + -- Need to set an identifier to mark the empty upstream. + upstream_obj.upstream = "empty_upstream" + end + server_list[upstream_obj.upstream] = upstream_obj.weighted_upstream + end + + return roundrobin:new(server_list) +end + + +function _M.access(conf, ctx) + if not conf or not conf.rules then + return + end + + local upstreams, match_flag + for _, rule in pairs(conf.rules) do + match_flag = true + for _, single_match in ipairs(rule.match) do + local expr, err = expr.new(single_match.vars) + if err then Review comment: We should log this error. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
