[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-20 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r830609058



##
File path: conf/config-default.yaml
##
@@ -266,6 +266,7 @@ nginx_config: # config for render the 
template to generate n
   introspection: 10m
   access-tokens: 1m
   ext-plugin: 1m
+  tars: 1m

Review comment:
   According to the example, we need to close if we don't keepalive?
   https://github.com/openresty/lua-resty-mysql#synopsis
   ```
-- put it into the connection pool of size 100,
   -- with 10 seconds max idle timeout
   local ok, err = db:set_keepalive(1, 100)
   if not ok then
   ngx.say("failed to set keepalive: ", err)
   return
   end
   
   -- or just close the connection right away:
   -- local ok, err = db:close()
   -- if not ok then
   -- ngx.say("failed to close: ", err)
   -- return
   -- end
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-20 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r830607932



##
File path: apisix/discovery/tars/init.lua
##
@@ -0,0 +1,340 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local endpoint_dict = ngx.shared.tars
+if not endpoint_dict then
+error("failed to get nginx shared dict: tars, please check your APISIX 
version")
+end
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+version = 0.1,
+}
+
+local default_weight
+
+local last_fetch_full_time = 0
+local last_db_error
+
+local endpoint_lrucache = core.lrucache.new({
+ttl = 300,
+count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+
+--[[
+endpoints format as follows:
+  tcp -h 172.16.1.1 -p 11 -t 6000 -e 0,tcp -e 0 -p 12 -h 172.16.1.1,tcp -p 13 
-h 172.16.1.1
+we extract host and port value via endpoints_pattern
+--]]
+local endpoints_pattern = core.table.concat(
+{ 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+  [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+local endpoint_content = core.json.encode(nodes, true)
+local endpoint_version = ngx.crc32_long(endpoint_content)
+core.log.debug("set servant ", servant, endpoint_content)
+local _, err
+_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+if err then
+core.log.error("set endpoint version into nginx shared dict failed, ", 
err)
+return
+end
+_, err = endpoint_dict:safe_set(servant, endpoint_content)
+if err then
+core.log.error("set endpoint into nginx shared dict failed, ", err)
+endpoint_dict:delete(servant .. "#version")
+end
+end
+
+
+local function delete_endpoint(servant)
+core.log.info("delete servant ", servant)
+endpoint_dict:delete(servant .. "#version")
+endpoint_dict:delete(servant)
+end
+
+
+local function add_endpoint_to_lrucache(servant)
+local endpoint_content, err = endpoint_dict:get_stale(servant)
+if not endpoint_content then
+core.log.error("get empty endpoint content, servant: ", servant, ", 
err: ", err)
+return nil
+end
+
+local endpoint, err = core.json.decode(endpoint_content)
+if not endpoint then
+core.log.error("decode json failed, content: ", endpoint_content, ", 
err: ", err)
+return nil
+end
+
+return endpoint
+end
+
+
+local function get_endpoint(servant)
+local endpoint_version, err = endpoint_dict:get_stale(servant .. 
"#version")
+if not endpoint_version  then
+if err then
+core.log.error("get empty endpoint version, servant: ", servant, 
", err: ", err)
+end
+return nil
+end
+return endpoint_lrucache(servant, endpoint_version, 
add_endpoint_to_lrucache, servant)
+end
+
+
+local function extract_endpoint(query_result)
+for _, p in ipairs(query_result) do
+repeat
+local servant = 

[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-20 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r830606894



##
File path: apisix/discovery/tars/init.lua
##
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+ttl = 300,
+count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+{ 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+  [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+local endpoint_content = core.json.encode(nodes, true)
+local endpoint_version = ngx.crc32_long(endpoint_content)
+core.log.debug("set servant ", servant, endpoint_content)
+local _, err
+_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+if err then
+core.log.error("set endpoint version into discovery DICT failed, ", 
err)
+return
+end
+_, err = endpoint_dict:safe_set(servant, endpoint_content)
+if err then
+core.log.error("set endpoint into discovery DICT failed, ", err)
+endpoint_dict:delete(servant .. "#version")
+end
+end
+
+
+local function delete_endpoint(servant)
+core.log.info("delete servant ", servant)
+endpoint_dict:delete(servant .. "#version")
+endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)
+local endpoint_content = endpoint_dict:get_stale(servant)
+if not endpoint_content then
+core.log.error("get empty endpoint content from discovery DICT, 
servant: ", servant)
+return nil
+end
+
+local endpoint = core.json.decode(endpoint_content)
+if not endpoint then
+core.log.error("decode endpoint content failed, content: ", 
endpoint_content)
+return nil
+end
+
+return endpoint
+end
+
+
+local function get_endpoint(servant)
+local endpoint_version = endpoint_dict:get_stale(servant .. "#version")
+if not endpoint_version then
+return nil
+end
+
+return endpoint_lrucache(servant, endpoint_version, 
create_endpoint_lrucache, servant)
+end
+
+
+local function extract_endpoint(query_result)
+for _, p in ipairs(query_result) do
+repeat
+local servant = p.servant
+
+if servant == ngx.null then
+break
+end
+
+if p.activated == 1 then
+activated_buffer[servant] = ngx.null
+elseif p.activated == 0 then
+if activated_buffer[servant] == nil then
+delete_endpoint(servant)
+end
+break
+end
+
+core.table.clear(nodes_buffer)
+local iterator = 

[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-18 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r829653704



##
File path: apisix/discovery/tars/init.lua
##
@@ -0,0 +1,332 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_db_error
+
+local endpoint_lrucache = core.lrucache.new({
+ttl = 300,
+count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+
+--[[
+endpoints format as follows:
+  tcp -h 172.16.1.1 -p 11 -t 6000 -e 0,tcp -e 0 -p 12 -h 172.16.1.1,tcp -p 13 
-h 172.16.1.1
+we need extract host and port value via regex
+--]]
+local endpoints_pattern = core.table.concat(
+{ 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+  [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+local endpoint_content = core.json.encode(nodes, true)
+local endpoint_version = ngx.crc32_long(endpoint_content)
+core.log.debug("set servant ", servant, endpoint_content)
+local _, err
+_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+if err then
+core.log.error("set endpoint version into nginx shared dict failed, ", 
err)
+return
+end
+_, err = endpoint_dict:safe_set(servant, endpoint_content)
+if err then
+core.log.error("set endpoint into nginx shared dict failed, ", err)
+endpoint_dict:delete(servant .. "#version")
+end
+end
+
+
+local function delete_endpoint(servant)
+core.log.info("delete servant ", servant)
+endpoint_dict:delete(servant .. "#version")
+endpoint_dict:delete(servant)
+end
+
+
+local function add_endpoint_to_lrucache(servant)
+local endpoint_content, err = endpoint_dict:get_stale(servant)
+if not endpoint_content then
+core.log.error("get empty endpoint content, servant: ", servant, ", 
err: ", err)
+return nil
+end
+
+local endpoint, err = core.json.decode(endpoint_content)
+if not endpoint then
+core.log.error("decode json failed, content: ", endpoint_content, ", 
err: ", err)
+return nil
+end
+
+return endpoint
+end
+
+
+local function get_endpoint(servant)
+local endpoint_version, err = endpoint_dict:get_stale(servant .. 
"#version")

Review comment:
   Thanks for your explanation. Let's enrich the comment in the code.

##
File path: t/tars/discovery/tars.t
##
@@ -0,0 +1,381 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the 

[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-17 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r829654527



##
File path: t/tars/discovery/tars.t
##
@@ -0,0 +1,381 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+my ($block) = @_;
+
+my $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  tars:
+db_conf:
+  host: 127.0.0.1
+  port: 3306
+  database: db_tars
+  user: root
+  password: tars2022
+full_fetch_interval: 90

Review comment:
   What about rewriting schema in the test?
   Like 
https://github.com/apache/apisix/blob/9d450d7fe3169a77727df28696d083809b93977a/t/plugin/skywalking.t#L34-L35




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-17 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r829653704



##
File path: apisix/discovery/tars/init.lua
##
@@ -0,0 +1,332 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_db_error
+
+local endpoint_lrucache = core.lrucache.new({
+ttl = 300,
+count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+
+--[[
+endpoints format as follows:
+  tcp -h 172.16.1.1 -p 11 -t 6000 -e 0,tcp -e 0 -p 12 -h 172.16.1.1,tcp -p 13 
-h 172.16.1.1
+we need extract host and port value via regex
+--]]
+local endpoints_pattern = core.table.concat(
+{ 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+  [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+local endpoint_content = core.json.encode(nodes, true)
+local endpoint_version = ngx.crc32_long(endpoint_content)
+core.log.debug("set servant ", servant, endpoint_content)
+local _, err
+_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+if err then
+core.log.error("set endpoint version into nginx shared dict failed, ", 
err)
+return
+end
+_, err = endpoint_dict:safe_set(servant, endpoint_content)
+if err then
+core.log.error("set endpoint into nginx shared dict failed, ", err)
+endpoint_dict:delete(servant .. "#version")
+end
+end
+
+
+local function delete_endpoint(servant)
+core.log.info("delete servant ", servant)
+endpoint_dict:delete(servant .. "#version")
+endpoint_dict:delete(servant)
+end
+
+
+local function add_endpoint_to_lrucache(servant)
+local endpoint_content, err = endpoint_dict:get_stale(servant)
+if not endpoint_content then
+core.log.error("get empty endpoint content, servant: ", servant, ", 
err: ", err)
+return nil
+end
+
+local endpoint, err = core.json.decode(endpoint_content)
+if not endpoint then
+core.log.error("decode json failed, content: ", endpoint_content, ", 
err: ", err)
+return nil
+end
+
+return endpoint
+end
+
+
+local function get_endpoint(servant)
+local endpoint_version, err = endpoint_dict:get_stale(servant .. 
"#version")

Review comment:
   Thanks for your explanation. Let's enrich the comment in the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-16 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r828678657



##
File path: t/tars/discovery/tars.t
##
@@ -0,0 +1,381 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+my ($block) = @_;
+
+my $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  tars:
+db_conf:
+  host: 127.0.0.1
+  port: 3306
+  database: db_tars
+  user: root
+  password: tars2022
+full_fetch_interval: 90

Review comment:
   Maybe we can reduce the min in the schema? A sane user may not use a low 
value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-16 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r828675182



##
File path: apisix/discovery/tars/init.lua
##
@@ -0,0 +1,332 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_db_error
+
+local endpoint_lrucache = core.lrucache.new({
+ttl = 300,
+count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+
+--[[
+endpoints format as follows:
+  tcp -h 172.16.1.1 -p 11 -t 6000 -e 0,tcp -e 0 -p 12 -h 172.16.1.1,tcp -p 13 
-h 172.16.1.1
+we need extract host and port value via regex
+--]]
+local endpoints_pattern = core.table.concat(
+{ 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+  [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+local endpoint_content = core.json.encode(nodes, true)
+local endpoint_version = ngx.crc32_long(endpoint_content)
+core.log.debug("set servant ", servant, endpoint_content)
+local _, err
+_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+if err then
+core.log.error("set endpoint version into nginx shared dict failed, ", 
err)
+return
+end
+_, err = endpoint_dict:safe_set(servant, endpoint_content)
+if err then
+core.log.error("set endpoint into nginx shared dict failed, ", err)
+endpoint_dict:delete(servant .. "#version")
+end
+end
+
+
+local function delete_endpoint(servant)
+core.log.info("delete servant ", servant)
+endpoint_dict:delete(servant .. "#version")
+endpoint_dict:delete(servant)
+end
+
+
+local function add_endpoint_to_lrucache(servant)
+local endpoint_content, err = endpoint_dict:get_stale(servant)
+if not endpoint_content then
+core.log.error("get empty endpoint content, servant: ", servant, ", 
err: ", err)
+return nil
+end
+
+local endpoint, err = core.json.decode(endpoint_content)
+if not endpoint then
+core.log.error("decode json failed, content: ", endpoint_content, ", 
err: ", err)
+return nil
+end
+
+return endpoint
+end
+
+
+local function get_endpoint(servant)
+local endpoint_version, err = endpoint_dict:get_stale(servant .. 
"#version")

Review comment:
   We don't set any expiration for the shdict's data, right? So why do we 
need to use get_stale & flush_expired?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-16 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r828672726



##
File path: conf/config-default.yaml
##
@@ -266,6 +266,7 @@ nginx_config: # config for render the 
template to generate n
   introspection: 10m
   access-tokens: 1m
   ext-plugin: 1m
+  tars: 1m

Review comment:
   We need to close the handler when getting last_db_error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-15 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r827586821



##
File path: conf/config-default.yaml
##
@@ -266,6 +266,7 @@ nginx_config: # config for render the 
template to generate n
   introspection: 10m
   access-tokens: 1m
   ext-plugin: 1m
+  tars: 1m

Review comment:
   Let's make it configurable in the next PR.

##
File path: t/tars/discovery/tars.t
##
@@ -0,0 +1,381 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+my ($block) = @_;
+
+my $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  tars:
+db_conf:
+  host: 127.0.0.1
+  port: 3306
+  database: db_tars
+  user: root
+  password: tars2022
+full_fetch_interval: 90

Review comment:
   Could we use a shorter interval instead of waiting > 90 in the test?

##
File path: apisix/discovery/tars/init.lua
##
@@ -0,0 +1,332 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_db_error
+
+local endpoint_lrucache = core.lrucache.new({
+ttl = 300,
+count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+
+--[[
+endpoints format as follows:
+  tcp -h 172.16.1.1 -p 11 -t 6000 -e 0,tcp -e 0 -p 12 -h 172.16.1.1,tcp -p 13 
-h 172.16.1.1
+we need extract host and port value via regex
+--]]
+local endpoints_pattern = core.table.concat(
+{ 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+  [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+local endpoint_content = core.json.encode(nodes, true)
+local endpoint_version = ngx.crc32_long(endpoint_content)
+core.log.debug("set servant ", servant, endpoint_content)
+local _, err
+_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+if err 

[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

2022-03-13 Thread GitBox


spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r825449599



##
File path: apisix/discovery/tars/init.lua
##
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+ttl = 300,
+count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+{ 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+  [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+local endpoint_content = core.json.encode(nodes, true)
+local endpoint_version = ngx.crc32_long(endpoint_content)
+core.log.debug("set servant ", servant, endpoint_content)
+local _, err
+_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+if err then
+core.log.error("set endpoint version into discovery DICT failed, ", 
err)
+return
+end
+_, err = endpoint_dict:safe_set(servant, endpoint_content)
+if err then
+core.log.error("set endpoint into discovery DICT failed, ", err)
+endpoint_dict:delete(servant .. "#version")
+end
+end
+
+
+local function delete_endpoint(servant)
+core.log.info("delete servant ", servant)
+endpoint_dict:delete(servant .. "#version")
+endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)
+local endpoint_content = endpoint_dict:get_stale(servant)
+if not endpoint_content then
+core.log.error("get empty endpoint content from discovery DICT, 
servant: ", servant)
+return nil
+end
+
+local endpoint = core.json.decode(endpoint_content)
+if not endpoint then
+core.log.error("decode endpoint content failed, content: ", 
endpoint_content)
+return nil
+end
+
+return endpoint
+end
+
+
+local function get_endpoint(servant)
+local endpoint_version = endpoint_dict:get_stale(servant .. "#version")
+if not endpoint_version then
+return nil
+end
+
+return endpoint_lrucache(servant, endpoint_version, 
create_endpoint_lrucache, servant)
+end
+
+
+local function extract_endpoint(query_result)
+for _, p in ipairs(query_result) do
+repeat
+local servant = p.servant
+
+if servant == ngx.null then
+break
+end
+
+if p.activated == 1 then
+activated_buffer[servant] = ngx.null
+elseif p.activated == 0 then
+if activated_buffer[servant] == nil then
+delete_endpoint(servant)
+end
+break
+end
+
+core.table.clear(nodes_buffer)
+local iterator =