membphis commented on code in PR #12364:
URL: https://github.com/apache/apisix/pull/12364#discussion_r2167915543
##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -195,6 +264,11 @@ function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
+function _M.access(conf)
+ -- set_version will call ES server only the first time
+ -- so this should not amount to considerable overhead
+ set_version(conf)
Review Comment:
how about `fetch_and_update_es_version`?
##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -159,9 +221,16 @@ local function send_to_elasticsearch(conf, entries)
local uri = selected_endpoint_addr .. "/_bulk"
local body = core.table.concat(entries, "")
local headers = {
- ["Content-Type"] = "application/x-ndjson;compatible-with=7",
- ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7"
+ ["Content-Type"] = "application/x-ndjson",
+ ["Accept"] = "application/vnd.elasticsearch+json"
}
+ if conf._version == "8" then
+ headers["Content-Type"] = headers["Content-Type"] .. compat_header_7
Review Comment:
can we remove `compat_header_7` and `compat_header_8`?
If yes, then we can move `if conf._version == "8" then` and `elseif
conf._version == "9" then`.
just use the default header, pls confirm if it is ok:
```
local headers = {
["Content-Type"] = "application/x-ndjson",
["Accept"] = "application/vnd.elasticsearch+json"
}
```
##########
ci/pod/docker-compose.plugin.yml:
##########
@@ -225,6 +225,66 @@ services:
http.port: 9201
xpack.security.enabled: 'true'
+ elasticsearch-noauth-2:
+ image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2
+ restart: unless-stopped
+ ports:
+ - "9400:9200"
+ - "9500:9300"
+ environment:
+ ES_JAVA_OPTS: -Xms512m -Xmx512m
+ discovery.type: single-node
+ xpack.security.enabled: 'false'
+
+ elasticsearch-auth-2:
+ image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2
+ restart: unless-stopped
+ ports:
+ - "9301:9201"
+ environment:
+ ES_JAVA_OPTS: -Xms512m -Xmx512m
+ discovery.type: single-node
+ ELASTIC_USERNAME: elastic
+ ELASTIC_PASSWORD: 123456
+ http.port: 9201
+ xpack.security.enabled: 'true'
+
+ elasticsearch-noauth-3:
+ image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0
+ restart: unless-stopped
+ ports:
+ - "9600:9200"
+ - "9700:9300"
+ environment:
+ ES_JAVA_OPTS: -Xms512m -Xmx512m
+ discovery.type: single-node
+ xpack.security.enabled: 'false'
+
+ elasticsearch-auth-3:
+ image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0
+ restart: unless-stopped
+ ports:
+ - "9401:9201"
+ environment:
+ ES_JAVA_OPTS: -Xms512m -Xmx512m
+ discovery.type: single-node
+ ELASTIC_USERNAME: elastic
+ ELASTIC_PASSWORD: 123456
+ http.port: 9201
+ xpack.security.enabled: 'true'
+
+ elasticsearch-auth-4:
+ image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0
Review Comment:
we started more instances of es
the github action, is it power enough to support this? I am little worried
##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -124,32 +124,94 @@ function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
+
local check = {"endpoint_addrs"}
core.utils.check_https(check, conf, plugin_name)
core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name)
-
return core.schema.check(schema, conf)
end
+local function get_es_major_version(uri, conf)
+ local httpc = http.new()
+ if not httpc then
+ return nil, "failed to create http client"
+ end
+ local headers = {}
+ if conf.auth then
+ local authorization = "Basic " .. ngx.encode_base64(
+ conf.auth.username .. ":" .. conf.auth.password
+ )
+ headers["Authorization"] = authorization
+ end
+ httpc:set_timeout(conf.timeout * 1000)
+ local res, err = httpc:request_uri(uri, {
+ ssl_verify = conf.ssl_verify,
+ method = "GET",
+ headers = headers,
+ })
+ if not res then
+ return false, err
+ end
+ if res.status ~= 200 then
+ return nil, str_format("server returned status: %d, body: %s",
+ res.status, res.body or "")
+ end
+ local json_body, err = core.json.decode(res.body)
+ if not json_body then
+ return nil, "failed to decode response body: " .. err
+ end
+ if not json_body.version or not json_body.version.number then
+ return nil, "failed to get version from response body"
+ end
+
+ local major_version = json_body.version.number:match("^(%d+)%.")
+ if not major_version then
+ return nil, "invalid version format: " .. json_body.version.number
+ end
+
+ return major_version
+end
+
+
local function get_logger_entry(conf, ctx)
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
- return core.json.encode({
- create = {
- _index = conf.field.index,
- _type = conf.field.type
- }
- }) .. "\n" ..
+ local body = {
+ index = {
+ _index = conf.field.index
+ }
+ }
+ -- for older version type is required
+ if conf._version == "6" or conf._version == "5" then
+ body.index._type = "_doc"
+ end
+ return core.json.encode(body) .. "\n" ..
core.json.encode(entry) .. "\n"
end
+local function set_version(conf)
+ if not conf._version then
+ local selected_endpoint_addr
+ if conf.endpoint_addr then
+ selected_endpoint_addr = conf.endpoint_addr
+ else
+ selected_endpoint_addr =
conf.endpoint_addrs[math_random(#conf.endpoint_addrs)]
+ end
+ local major_version, err =
get_es_major_version(selected_endpoint_addr, conf)
+ if err then
+ return false, str_format("failed to get Elasticsearch version:
%s", err)
Review Comment:
we should write error log here
we do not capture the error msg in `access` phase
##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -124,32 +124,94 @@ function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
+
local check = {"endpoint_addrs"}
core.utils.check_https(check, conf, plugin_name)
core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name)
-
return core.schema.check(schema, conf)
end
+local function get_es_major_version(uri, conf)
+ local httpc = http.new()
+ if not httpc then
+ return nil, "failed to create http client"
+ end
+ local headers = {}
+ if conf.auth then
+ local authorization = "Basic " .. ngx.encode_base64(
+ conf.auth.username .. ":" .. conf.auth.password
+ )
+ headers["Authorization"] = authorization
+ end
+ httpc:set_timeout(conf.timeout * 1000)
+ local res, err = httpc:request_uri(uri, {
+ ssl_verify = conf.ssl_verify,
+ method = "GET",
+ headers = headers,
+ })
+ if not res then
+ return false, err
+ end
+ if res.status ~= 200 then
+ return nil, str_format("server returned status: %d, body: %s",
+ res.status, res.body or "")
+ end
+ local json_body, err = core.json.decode(res.body)
+ if not json_body then
+ return nil, "failed to decode response body: " .. err
+ end
+ if not json_body.version or not json_body.version.number then
+ return nil, "failed to get version from response body"
+ end
+
+ local major_version = json_body.version.number:match("^(%d+)%.")
+ if not major_version then
+ return nil, "invalid version format: " .. json_body.version.number
+ end
+
+ return major_version
+end
+
+
local function get_logger_entry(conf, ctx)
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
- return core.json.encode({
- create = {
- _index = conf.field.index,
- _type = conf.field.type
- }
- }) .. "\n" ..
+ local body = {
+ index = {
+ _index = conf.field.index
+ }
+ }
+ -- for older version type is required
+ if conf._version == "6" or conf._version == "5" then
+ body.index._type = "_doc"
+ end
+ return core.json.encode(body) .. "\n" ..
core.json.encode(entry) .. "\n"
end
+local function set_version(conf)
Review Comment:
```lua
local function set_version(conf)
if conf._version then
return
end
local selected_endpoint_addr
if conf.endpoint_addr then
selected_endpoint_addr = conf.endpoint_addr
else
selected_endpoint_addr =
conf.endpoint_addrs[math_random(#conf.endpoint_addrs)]
end
local major_version, err =
get_es_major_version(selected_endpoint_addr, conf)
if err then
return false, str_format("failed to get Elasticsearch version:
%s", err)
end
conf._version = major_version
end
##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -159,9 +221,16 @@ local function send_to_elasticsearch(conf, entries)
local uri = selected_endpoint_addr .. "/_bulk"
local body = core.table.concat(entries, "")
local headers = {
- ["Content-Type"] = "application/x-ndjson;compatible-with=7",
- ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7"
+ ["Content-Type"] = "application/x-ndjson",
+ ["Accept"] = "application/vnd.elasticsearch+json"
}
+ if conf._version == "8" then
+ headers["Content-Type"] = headers["Content-Type"] .. compat_header_7
Review Comment:
I do not like `if conf._version == "8" then ... elseif conf._version == "9"
then ... end` style
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]