zouchengzhuo opened a new issue, #9214:
URL: https://github.com/apache/apisix/issues/9214

   ### Current Behavior
   
   <img width="887" alt="image" 
src="https://user-images.githubusercontent.com/6971870/229087450-57369d63-d8d7-408b-9e56-edf3a9cb71cf.png";>
   
   客户端建立连接时,读取连接的 tcp 对象,同时创建3个子协程,连接3个后端
   
   <img width="846" alt="image" 
src="https://user-images.githubusercontent.com/6971870/229087918-998d9470-8516-4322-a753-8959cd7bba4e.png";>
   
   与后端的连接建立后,尝试在 while(true) 中  receive 数据。
   
   通过日志观察,有一定概率出现很多地址相同的 tcp 对象,无论是 ngx.req.socket 拿到的,还是 ngx.tcp.socket 
拿到的,都有可能出现重复。  
   
   <img width="1275" alt="image" 
src="https://user-images.githubusercontent.com/6971870/229088641-831832ba-4e88-4305-a042-3c8cb1e595bd.png";>
   
   当出现这种情况时,在插件中 while(true) 调用 receive 从 ngx.req.socket 
中读取数据时,会一直阻塞,既不能成功读取数据,也不会超时,即使读缓冲区中已经有了很多数据堆积。  
   
   <img width="758" alt="image" 
src="https://user-images.githubusercontent.com/6971870/229089016-65c616d0-0f09-4495-b18d-6fad0b5bfcac.png";>
   
   
   
   
   
   ### Expected Behavior
   
   ngx.req.socket 不再阻塞,需要能成功读取到数据,或者超时。
   
   ### Error Logs
   
   _No response_
   
   ### Steps to Reproduce
   
   通过一个流式插件可以复现,插件代码:
   ```lua
   local plugin_name = "czzou-tcp-stream"
   local ngx_log = ngx.log
   local ngx_DEBUG = ngx.DEBUG
   local ngx_ERROR = ngx.ERR
   
   local schema = {
       type = "object"
   }
   
   local _M = {
       version = 0.1,
       priority = 1005,
       name = plugin_name,
       schema = schema,
   }
   
   function _M.check_schema(schema_type, schema)
       -- perform validation
       return true
   end
   
   function _M.upstream_handler(remote_port, server_ip, server_port)
       local upstream_sock = ngx.socket.tcp()
       local ok, err = upstream_sock:connect(server_ip,server_port)
       if not ok then
           ngx_log(ngx_ERROR, remote_port .. string.format(" failed to receive 
data from client: %s", err))
           return
       else
           ngx_log(ngx_ERROR, string.format("connect upstream success ip %s 
port %d ", server_ip, server_port))
       end
       ngx_log(ngx_ERROR, remote_port ..  string.format(" start 
upstream_handler: %s ", tostring(upstream_sock)))
       while true do
           -- 每次尝试读取 3k~10k 数据,然后发给 socket
           upstream_sock:settimeout(30000)
           local package_len = 3000 + math.floor(7000 * math.random())
           -- local package_len = 1024
           ngx_log(ngx_ERROR, remote_port ..  " start to receive data from 
upstream \n")
           local data, err, partial = upstream_sock:receive(package_len)
           if err then
               ngx_log(ngx_ERROR, remote_port .. string.format(" failed to 
receive data from upstream: %s", err))
               if string.find(err, "timeout") then
                   goto continue
               end
               return
           end
           ngx_log(ngx_ERROR, remote_port ..  " received " .. #data .. " bytes 
of data from upstream \n")
           :: continue ::
       end
   end
   
   function _M.preread(conf, ctx)
   
       local socket = ngx.req.socket(true)
       local upstream_co_list = {}
       local upstream_sock_list = {}
       local upstream_num = 3
       local remote_port = ctx.var["remote_port"]
       ngx_log(ngx_ERROR, remote_port ..  string.format(" start 
upstream_handler (main): %s ", tostring(socket)))
       -- 连接 upstream_num 个后端,有数据来了随机发给一个后端,然后将后端的数据会写给 socket
       local server_ip = "127.0.0.1"
       local server_port = 10000
       for i = 1, upstream_num do
           local co = ngx.thread.spawn(_M.upstream_handler, remote_port, 
server_ip, server_port)
           -- 本地服务监听 10000~1000n
           server_port = server_port + 1
           upstream_co_list[i] = co
           :: continue ::
       end
       
   
       while true do
           socket:settimeout(30000)
           local package_len = 3000 + math.floor(7000 * math.random())
           -- local package_len = 1024
           ngx_log(ngx_ERROR, remote_port ..  string.format(" try to receive %d 
bytes from client", package_len))
           local data, err, partial = socket:receive(package_len)
           if err then
               ngx_log(ngx_ERROR, remote_port .. string.format(" failed to 
receive data from client: %s", err))
               if string.find(err, "timeout") then
                   goto continue
               end
               -- 关闭后端协程并退出
               for i= 1, upstream_num do
                   ngx.thread.kill(upstream_co_list[i])
               end
               -- 如果是 close 返回1,否则返回 503
               if err == "closed" then
                   return 1
               else
                   return 503
               end
           end
           ngx_log(ngx_ERROR, remote_port .. " received " .. #data .. " bytes 
of data from client\n")
           :: continue ::
       end
   
       socket:close()
   end
   
   return _M
   ```
   
   ### Environment
   
   - APISIX version (run `apisix version`): 3.2.0
   - Operating system (run `uname -a`): centos7
   - OpenResty / Nginx version (run `openresty -V` or `nginx -V`): 
openresty/1.21.4.1
   - etcd version, if relevant (run `curl 
http://127.0.0.1:9090/v1/server_info`): 3.4.0
   - APISIX Dashboard version, if relevant:
   - Plugin runner version, for issues related to plugin runners:
   - LuaRocks version, for installation issues (run `luarocks --version`):
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to