zhixiongdu027 commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r830632628



##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,340 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local endpoint_dict = ngx.shared.tars
+if not endpoint_dict then
+    error("failed to get nginx shared dict: tars, please check your APISIX 
version")
+end
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by 
endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') 
activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, 
server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local default_weight
+
+local last_fetch_full_time = 0
+local last_db_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+
+--[[
+endpoints format as follows:
+  tcp -h 172.16.1.1 -p 11 -t 6000 -e 0,tcp -e 0 -p 12 -h 172.16.1.1,tcp -p 13 
-h 172.16.1.1
+we extract host and port value via endpoints_pattern
+--]]
+local endpoints_pattern = core.table.concat(
+        { 
[[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+          [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+    local endpoint_content = core.json.encode(nodes, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    core.log.debug("set servant ", servant, endpoint_content)
+    local _, err
+    _, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into nginx shared dict failed, ", 
err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(servant, endpoint_content)
+    if err then
+        core.log.error("set endpoint into nginx shared dict failed, ", err)
+        endpoint_dict:delete(servant .. "#version")
+    end
+end
+
+
+local function delete_endpoint(servant)
+    core.log.info("delete servant ", servant)
+    endpoint_dict:delete(servant .. "#version")
+    endpoint_dict:delete(servant)
+end
+
+
+local function add_endpoint_to_lrucache(servant)
+    local endpoint_content, err = endpoint_dict:get_stale(servant)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content, servant: ", servant, ", 
err: ", err)
+        return nil
+    end
+
+    local endpoint, err = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode json failed, content: ", endpoint_content, ", 
err: ", err)
+        return nil
+    end
+
+    return endpoint
+end
+
+
+local function get_endpoint(servant)
+    local endpoint_version, err = endpoint_dict:get_stale(servant .. 
"#version")
+    if not endpoint_version  then
+        if err then
+            core.log.error("get empty endpoint version, servant: ", servant, 
", err: ", err)
+        end
+        return nil
+    end
+    return endpoint_lrucache(servant, endpoint_version, 
add_endpoint_to_lrucache, servant)
+end
+
+
+local function extract_endpoint(query_result)
+    for _, p in ipairs(query_result) do
+        repeat
+            local servant = p.servant
+
+            if servant == ngx.null then
+                break
+            end
+
+            if p.activated == 1 then
+                activated_buffer[servant] = ngx.null
+            elseif p.activated == 0 then
+                if activated_buffer[servant] == nil then
+                    delete_endpoint(servant)
+                end
+                break
+            end
+
+            core.table.clear(nodes_buffer)
+            local iterator = ngx.re.gmatch(p.endpoints, endpoints_pattern, 
"jao")
+            while true do
+                local captures, err = iterator()
+                if err then
+                    core.log.error("gmatch failed, error: ", err, " , 
endpoints: ", p.endpoints)
+                    break
+                end
+
+                if not captures then
+                    break
+                end
+
+                local host, port
+                if captures[3] == "h" or captures[3] == "H" then
+                    host = captures[4]
+                    port = tonumber(captures[8])
+                else
+                    host = captures[8]
+                    port = tonumber(captures[4])
+                end
+
+                core.table.insert(nodes_buffer, {
+                    host = host,
+                    port = port,
+                    weight = default_weight,
+                })
+            end
+            update_endpoint(servant, nodes_buffer)
+        until true
+    end
+end
+
+
+local function fetch_full(db_cli)
+    local res, err, errcode, sqlstate = db_cli:query(full_query_sql)
+    --[[
+    res format is as follows:
+    {
+        {
+            servant = "A.AServer.FirstObj",
+            endpoints = "tcp -h 172.16.1.1 -p 10001 -e 0 -t 3000,tcp -p 10002 
-h 172.16.1.2 -t 3000"
+        },
+        {
+            servant = "A.AServer.SecondObj",
+            endpoints = "tcp -t 3000 -p 10002 -h 172.16.1.2"
+        },
+    }
+
+    if current endpoint_dict is as follows:
+      key1:nodes1, key2:nodes2, key3:nodes3
+
+    then fetch_full get follow results:
+      key1:nodes1, key4:nodes4, key5:nodes5
+
+    at this time, we need
+      1: setup key4:nodes4, key5:nodes5
+      2: delete key2:nodes2, key3:nodes3
+
+    to achieve goals, we should:
+      1: before setup results, execute endpoint_dict:flush_all()
+      2:  after setup results, execute endpoint_dict:flush_expired()
+    --]]
+    if not res then
+        core.log.error("query failed, error: ", err, ", ", errcode, " ", 
sqlstate)
+        return err
+    end
+
+    endpoint_dict:flush_all()
+    extract_endpoint(res)
+
+    while err == "again" do
+        res, err, errcode, sqlstate = db_cli:read_result()
+        if not res then
+            if err then
+                core.log.error("read result failed, error: ", err, ", ", 
errcode, " ", sqlstate)
+            end
+            return err
+        end
+        extract_endpoint(res)
+    end
+    endpoint_dict:flush_expired()

Review comment:
       > Please add some comment about the expiration operation, thanks.
   
   Sorry, I add comment in function fetch_full and forget function 
get_endpoint()
   
   I will update




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to