spacewander commented on code in PR #6995:
URL: https://github.com/apache/apisix/pull/6995#discussion_r867671259
##########
.github/workflows/build.yml:
##########
@@ -30,7 +30,7 @@ jobs:
- linux_openresty
- linux_openresty_1_17
test_dir:
- - t/plugin
+ - t/plugin t/pubsub
- t/admin t/cli t/config-center-yaml t/control t/core t/debug
t/discovery t/error_page t/misc
- t/node t/router t/script t/stream-node t/utils t/wasm
t/xds-library t/xrpc
Review Comment:
Should add the t/pubsub to this as the t/plugin is already time-consuming.
##########
Makefile:
##########
@@ -263,6 +263,7 @@ install: runtime
# Lua directories listed in alphabetical order
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix
$(ENV_INSTALL) apisix/*.lua $(ENV_INST_LUADIR)/apisix/
+ $(ENV_INSTALL) apisix/*.proto $(ENV_INST_LUADIR)/apisix/
Review Comment:
Better to create a separate directory to store the proto, like:
https://github.com/Kong/kong/tree/master/kong/include
https://github.com/envoyproxy/envoy/tree/main/api/envoy
##########
apisix/schema_def.lua:
##########
@@ -406,8 +406,12 @@ local upstream_schema = {
properties = {
client_cert = certificate_scheme,
client_key = private_key_schema,
+ verify = {
+ type = "boolean",
+ description = "Turn on server certificate verification",
Review Comment:
Let's add a default value `false`
##########
apisix/schema_def.lua:
##########
@@ -406,8 +406,12 @@ local upstream_schema = {
properties = {
client_cert = certificate_scheme,
client_key = private_key_schema,
+ verify = {
+ type = "boolean",
+ description = "Turn on server certificate verification",
+ },
},
- required = {"client_cert", "client_key"},
+ required = {"client_cert", "client_key"}
Review Comment:
Now the cert/key is no longer required
##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: Pub-Sub
+keywords:
+ - APISIX
+ - Pub-Sub
+description: This document contains information about the Apache APISIX
pub-sub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not
send messages directly to message consumers, but are relayed by a specific
broker that caches messages sent by producers and then actively pushes them to
subscribed consumers or pulls them by consumers. This pattern is often used in
system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic
from the server to the client. If we can combine it with a publish-subscribe
scenario, we can achieve more powerful features, such as real-time
collaboration on online documents, online games, etc.
+
+## Architecture
+
+
+
+Currently, Apache APISIX supports WebSocket communication with the client,
which can be any application that supports WebSocket, with a custom Protocol
Buffer as the application layer communication protocol, see the [protocol
definition](../../../apisix/pubsub.proto).
Review Comment:
```suggestion
Currently, Apache APISIX supports WebSocket communication with the client,
which can be any application that supports WebSocket, with Protocol Buffer as
the serialization mechanism, see the [protocol
definition](../../../apisix/pubsub.proto).
```
##########
t/node/upstream-kafka.t:
##########
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+ $block->set_value("no_error_log", "[error]");
+ }
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: success
+--- config
Review Comment:
We should put it to `t/admin/upstream`
##########
apisix/pubsub.proto:
##########
@@ -0,0 +1,71 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+syntax = "proto3";
+
+option go_package = "pubsub";
+
+// request
+message CmdKafkaListOffset {
+ string topic = 1;
+ int32 partition = 2;
+ int64 timestamp = 3;
+}
+
+message CmdKafkaFetch {
+ string topic = 1;
+ int32 partition = 2;
+ int64 offset = 3;
+}
+
+message PubSubReq {
+ int64 sequence = 1;
Review Comment:
Let's doc the usage of sequence field and why the req starts from 31 in the
comment
##########
apisix/plugins/kafka-proxy.lua:
##########
@@ -0,0 +1,78 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+
+
+local schema = {
+ type = "object",
+ properties = {
+ enable_tls = {
Review Comment:
We don't need to configure TLS here. It's defined in the upstream.
##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,131 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local core = require("apisix.core")
+local ws_server = require("resty.websocket.server")
+local protoc = require("protoc")
+local pb = require("pb")
+local setmetatable = setmetatable
+local pcall = pcall
+local pairs = pairs
+
+protoc.reload()
+pb.option("int64_as_string")
+local pubsub_protoc = protoc.new()
+
+-- This module is used to handle ws server command
+-- processing in pub-sub scenarios.
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+
+function _M.new()
+ -- compile the protobuf file on initial load module
+ -- ensure that each worker is loaded once
+ if not pubsub_protoc.loaded["pubsub.proto"] then
+ pubsub_protoc:addpath("apisix")
+ local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc,
"pubsub.proto")
+ if not ok then
+ pubsub_protoc:reset()
+ return nil, "failed to load pubsub protocol: "..err
+ end
+ end
+
+ local ws, err = ws_server:new()
+ if not ws then
+ return nil, err
+ end
+
+ local obj = setmetatable({
+ ws_server = ws,
+ cmd_handler = {},
+ }, mt)
+
+ return obj
+end
+
+
+-- add command callback function
+-- handler is function(params)
+-- return value is resp, err
+function _M.on(self, command, handler)
+ self.cmd_handler[command] = handler
+end
+
+
+-- enter the message receiving loop and wait for client data
+function _M.wait(self)
+ local ws = self.ws_server
+ while true do
+ -- read raw data frames from websocket connection
+ local raw_data, raw_type, err = ws:recv_frame()
+ if err then
+ ws:send_close()
+ return "websocket server: "..err
Review Comment:
Better not use `return` in a `while` loop. Use `break` and handle the err
outside.
##########
apisix/init.lua:
##########
@@ -318,6 +323,111 @@ local function common_phase(phase_name)
end
+ffi.cdef[[
+ int64_t atoll(const char *num);
+]]
+local function kafka_access_phase(api_ctx)
Review Comment:
Please put the big function to apisix/pubsub/kafka.lua.
The init.lua is big enough.
##########
apisix/plugins/kafka-proxy.lua:
##########
@@ -0,0 +1,78 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+
+
+local schema = {
+ type = "object",
+ properties = {
+ enable_tls = {
+ type = "boolean",
+ default = false,
+ },
+ ssl_verify = {
+ type = "boolean",
+ default = true,
+ },
+ enable_sasl = {
Review Comment:
Better to use conf:
```
sasl = {
username = ,
password = ,
```
so we can check them directly in the schema.
##########
docs/en/latest/config.json:
##########
@@ -212,6 +213,14 @@
"discovery/kubernetes"
]
},
+ {
+ "type": "category",
+ "label": "Pub-Sub",
Review Comment:
Better always use the same format "PubSub"
##########
docs/en/latest/plugins/kafka-proxy.md:
##########
@@ -0,0 +1,83 @@
+---
+title: kafka-proxy
+keywords:
+ - APISIX
+ - Plugin
+ - Kafka
+ - consumer
Review Comment:
Why is the consumer used
##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,131 @@
+--
Review Comment:
Please add ldoc like other files under core/
##########
t/node/upstream-kafka.t:
##########
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+ $block->set_value("no_error_log", "[error]");
+ }
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: success
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin")
+ local code, body = t.test("/apisix/admin/upstreams/kafka",
ngx.HTTP_PUT, [[{
+ "nodes": {
+ "127.0.0.1:9092": 1
+ },
+ "type": "none",
+ "scheme": "kafka"
+ }]])
+
+ ngx.say(code..body)
Review Comment:
Better use the style below:
https://github.com/apache/apisix/blob/1b5c1900da517bb7d34d4b221aace7242345e78a/t/admin/upstream4.t#L93-95
##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,131 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local core = require("apisix.core")
+local ws_server = require("resty.websocket.server")
+local protoc = require("protoc")
+local pb = require("pb")
+local setmetatable = setmetatable
+local pcall = pcall
+local pairs = pairs
+
+protoc.reload()
+pb.option("int64_as_string")
Review Comment:
Will this call pollute the global option of pb?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]