Copilot commented on code in PR #7607:
URL: https://github.com/apache/ignite-3/pull/7607#discussion_r2829772091
##########
modules/platforms/cpp/cmake/dependencies.cmake:
##########
@@ -43,6 +43,29 @@ function(fetch_dependency NAME URL MD5)
endif()
endfunction()
+function(add_asio_dependency)
+ message(STATUS "Download dependency: asio")
+# FetchContent_Declare(
+# asio
+# GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git
+# GIT_TAG asio-1-36-0
+# )
Review Comment:
Commented-out code should be removed rather than left in the codebase. If
this alternative implementation using GIT_REPOSITORY is needed for reference,
consider documenting it in a comment or external documentation instead.
```suggestion
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ev{};
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+ while (true) {
+ int in_fd;
+ {
+ sockaddr_in addr{};
+ socklen_t len = sizeof(addr);
+
+ in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+ if (in_fd < 0) {
+ if (errno != EAGAIN) {
+ std::error_code ec{errno, std::system_category()};
+ std::cerr << "Unexpected issue when accepting connection
err=" << ec.message() << "\n";
+ }
+ break;
+ }
+
+ set_socket_non_blocking(in_fd);
+
+ std::cout << "Client connected to proxy fd = " << in_fd <<
std::endl;
+ }
+
+ int out_fd;
+ {
+ out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ set_socket_non_blocking(out_fd);
+
+ if (out_fd < 0) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to create socket for outbound
proxy connection err = " + ec.message());
+ }
+
+ sockaddr_in addr{};
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(out_port);
+
+ inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+ int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+ if (res < 0) {
+ if (errno != EINPROGRESS) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to connection to server
err = " + ec.message());
+ }
+ }
+
+ std::cout << "Proxy connected to server fd = " << out_fd <<
std::endl;
+ }
+
+ std::shared_ptr<proxy_connection> conn =
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+ connections[in_fd] = conn;
+ connections[out_fd] = conn;
+
+ add_socket_to_epoll(in_fd);
+ add_socket_to_epoll(out_fd);
+
+ std::cout << "Socket pair has been created in_fd = " << in_fd << "
out_fd = " << out_fd << std::endl;
+ }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+ int fd = ep_ev.data.fd;
+
+ auto it = connections.find(fd);
+ if (it == connections.end()) {
+ throw std::runtime_error("Event for unknown socket occurred fd = " +
std::to_string(fd));
+ }
+
+ auto conn = it->second;
+
+ if (ep_ev.events & EPOLLIN) {
+
+ char buf[BUFF_SIZE];
+
+ int src = fd;
+ int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ while (true) {
+ ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+ if (received < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("recv");
+ break;
+
+ }
+
+ if (received == 0) {
+ close(src);
+ close(dst);
+
+ connections.erase(src);
+ connections.erase(dst);
+ break;
+ }
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+ queue.emplace(buf, received);
+
+ enable_writable_notification(dst);
+ }
+ }
+
+ if (ep_ev.events & EPOLLOUT) {
+ int dst = fd;
+ int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+
+ while (!queue.empty()) {
+
+ const message_chunk& chunk = queue.front();
+ ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+ if (sent <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("send");
+ break;
+ }
+
+ queue.pop();
Review Comment:
Potential partial send issue: send() may send fewer bytes than requested.
The code assumes all bytes are sent in one call (line 249), but send() can
return a value less than chunk.m_size. This could lead to incomplete message
transmission. The message_chunk should track how many bytes have been sent and
retry sending the remaining bytes.
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ev{};
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+ while (true) {
+ int in_fd;
+ {
+ sockaddr_in addr{};
+ socklen_t len = sizeof(addr);
+
+ in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+ if (in_fd < 0) {
+ if (errno != EAGAIN) {
+ std::error_code ec{errno, std::system_category()};
+ std::cerr << "Unexpected issue when accepting connection
err=" << ec.message() << "\n";
+ }
+ break;
+ }
+
+ set_socket_non_blocking(in_fd);
+
+ std::cout << "Client connected to proxy fd = " << in_fd <<
std::endl;
+ }
+
+ int out_fd;
+ {
+ out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ set_socket_non_blocking(out_fd);
+
+ if (out_fd < 0) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to create socket for outbound
proxy connection err = " + ec.message());
+ }
+
+ sockaddr_in addr{};
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(out_port);
+
+ inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+ int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+ if (res < 0) {
+ if (errno != EINPROGRESS) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to connection to server
err = " + ec.message());
+ }
+ }
+
+ std::cout << "Proxy connected to server fd = " << out_fd <<
std::endl;
+ }
+
+ std::shared_ptr<proxy_connection> conn =
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+ connections[in_fd] = conn;
+ connections[out_fd] = conn;
+
+ add_socket_to_epoll(in_fd);
+ add_socket_to_epoll(out_fd);
+
+ std::cout << "Socket pair has been created in_fd = " << in_fd << "
out_fd = " << out_fd << std::endl;
+ }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+ int fd = ep_ev.data.fd;
+
+ auto it = connections.find(fd);
+ if (it == connections.end()) {
+ throw std::runtime_error("Event for unknown socket occurred fd = " +
std::to_string(fd));
+ }
+
+ auto conn = it->second;
+
+ if (ep_ev.events & EPOLLIN) {
+
+ char buf[BUFF_SIZE];
+
+ int src = fd;
+ int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ while (true) {
+ ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+ if (received < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("recv");
+ break;
+
+ }
+
+ if (received == 0) {
+ close(src);
+ close(dst);
+
+ connections.erase(src);
+ connections.erase(dst);
+ break;
Review Comment:
After closing sockets and erasing connections, the code breaks out of the
while loop. However, if there are multiple connections in the connections map
with the same src socket, this could lead to issues. Additionally, after
erasing connections and closing sockets, the loop should return or continue
carefully to avoid using invalidated references. The current implementation may
access invalid memory after the erase operations.
```suggestion
return;
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ev{};
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+ while (true) {
+ int in_fd;
+ {
+ sockaddr_in addr{};
+ socklen_t len = sizeof(addr);
+
+ in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+ if (in_fd < 0) {
+ if (errno != EAGAIN) {
+ std::error_code ec{errno, std::system_category()};
+ std::cerr << "Unexpected issue when accepting connection
err=" << ec.message() << "\n";
+ }
+ break;
+ }
+
+ set_socket_non_blocking(in_fd);
+
+ std::cout << "Client connected to proxy fd = " << in_fd <<
std::endl;
+ }
+
+ int out_fd;
+ {
+ out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ set_socket_non_blocking(out_fd);
+
+ if (out_fd < 0) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to create socket for outbound
proxy connection err = " + ec.message());
+ }
+
+ sockaddr_in addr{};
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(out_port);
+
+ inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+ int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+ if (res < 0) {
+ if (errno != EINPROGRESS) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to connection to server
err = " + ec.message());
+ }
+ }
+
+ std::cout << "Proxy connected to server fd = " << out_fd <<
std::endl;
+ }
+
+ std::shared_ptr<proxy_connection> conn =
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+ connections[in_fd] = conn;
+ connections[out_fd] = conn;
+
+ add_socket_to_epoll(in_fd);
+ add_socket_to_epoll(out_fd);
+
+ std::cout << "Socket pair has been created in_fd = " << in_fd << "
out_fd = " << out_fd << std::endl;
+ }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+ int fd = ep_ev.data.fd;
+
+ auto it = connections.find(fd);
+ if (it == connections.end()) {
+ throw std::runtime_error("Event for unknown socket occurred fd = " +
std::to_string(fd));
+ }
+
+ auto conn = it->second;
+
+ if (ep_ev.events & EPOLLIN) {
+
+ char buf[BUFF_SIZE];
+
+ int src = fd;
+ int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ while (true) {
+ ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+ if (received < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("recv");
+ break;
+
+ }
+
+ if (received == 0) {
+ close(src);
+ close(dst);
+
+ connections.erase(src);
+ connections.erase(dst);
+ break;
+ }
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+ queue.emplace(buf, received);
+
+ enable_writable_notification(dst);
+ }
+ }
+
+ if (ep_ev.events & EPOLLOUT) {
+ int dst = fd;
+ int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+
+ while (!queue.empty()) {
+
+ const message_chunk& chunk = queue.front();
+ ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+ if (sent <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("send");
+ break;
+ }
+
+ queue.pop();
+ }
+
+ disable_writable_notification(dst);
+ }
+}
+
+kgb_proxy::~kgb_proxy() {
+ fire_stop_event();
+
+ m_polling_thread->join();
+
+ close(stop_event_fd);
+ close(server_fd);
+ close(epoll_fd);
Review Comment:
File descriptors should only be closed if they are valid (>= 0). If start()
was never called, these file descriptors will have their initial values of -1,
and closing them will result in errors. Add checks to ensure file descriptors
are valid before closing them.
```suggestion
if (stop_event_fd >= 0) {
close(stop_event_fd);
}
if (server_fd >= 0) {
close(server_fd);
}
if (epoll_fd >= 0) {
close(epoll_fd);
}
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
Review Comment:
The return value of epoll_wait should be checked for errors. If epoll_wait
returns -1, errno should be checked and an error should be handled
appropriately. Currently, a negative return value would cause the loop to
iterate with event_cnt < 0, which is incorrect behavior.
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
Review Comment:
The return values of socket, setsockopt, bind, listen, and epoll_ctl should
be checked for errors. These system calls can fail and should have proper error
handling. Without checks, failures will go undetected and could cause undefined
behavior.
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#pragma once
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <queue>
+#include <sys/epoll.h>
+#include <thread>
+#include <unordered_map>
+
+namespace ignite::proxy {
+
+struct proxy_connection;
+
+class kgb_proxy {
+public:
+ static constexpr int MAX_EVENTS = 64;
+ static constexpr int BUFF_SIZE = 4096;
+
+ kgb_proxy(int in_port, int out_port)
+ : in_port(in_port)
+ , out_port(out_port) { }
+
+ ~kgb_proxy();
+ void start();
+
+private:
+ void enable_writable_notification(int fd);
+ void disable_writable_notification(int fd);
+ void do_serve();
+ void add_event_fd();
+ void fire_stop_event();
+ void start_server_socket();
+ void add_socket_to_epoll(int fd);
+ void process_incoming_connection();
+ void process_socket_event(const epoll_event &ep_ev);
+ const int in_port;
+ const int out_port;
+ int server_fd{-1};
+ int epoll_fd{-1};
+ int stop_event_fd{-1};
+ std::unique_ptr<std::thread> m_polling_thread{};
+ bool connected{false};
Review Comment:
The member variable 'connected' is declared but never used anywhere in the
class. Consider removing it to reduce clutter, or implement the intended
functionality if it was meant to track connection state.
```suggestion
```
##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+ char *m_arr;
+ size_t m_size;
+
+ message(char *arr, size_t size)
+ : m_arr(nullptr)
+ , m_size(size) {
+ m_arr = new char[m_size];
+ std::memcpy(m_arr, arr, size);
+ }
+
+ ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+ session(tcp::socket in_sock, tcp::socket out_sock)
+ : m_in_sock(std::move(in_sock))
+ , m_out_sock(std::move(out_sock)) { }
+
+ void start() { do_serve(); }
+
+ tcp::socket &get_out_sock() { return m_out_sock; }
+
+ void set_writable(bool writable) {
+ m_in_to_out_writable = writable;
+ m_out_to_in_writable = writable;
+ }
+
+ enum direction { forward, reverse };
+
+private:
+ void do_serve() {
+ do_read(forward);
+ do_read(reverse);
+ }
+
+ void do_read(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &src = std::get<0>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+ [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+ if (ec) {
+ throw std::runtime_error("Error while reading from socket " +
ec.message());
+ }
+
+ // we have one-threaded executor no synchronization is needed
+ queue.emplace(buf, len);
+
+ if (writable) { // there are pending write operation on this socket
+ do_write(direction);
+ }
+
+ do_read(direction);
+ });
+ }
+
+ void do_write(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &dst = std::get<1>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ writable = false; // protects from writing same buffer twice (from
head of queue).
+
+ if (!queue.empty()) {
+ message &msg = queue.front();
+
+ asio::async_write(
+ dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue,
direction, &writable](asio::error_code ec, size_t) {
+ if (ec) {
+ throw std::runtime_error("Error while writing to
socket " + ec.message());
+ }
+
+ queue.pop();
+
+ if (!queue.empty()) {
+ // makes writes on the same socket strictly ordered
+ do_write(direction);
+ } else {
+ writable = true; // now read operation can initiate
writes
+ }
+ });
+ }
+ }
+
+ std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&>
get_sockets_and_queue(direction direction) {
+ switch (direction) {
+ case forward:
+ return {m_in_sock, m_out_sock, m_in_to_out,
m_in_to_out_writable};
+ case reverse:
+ return {m_out_sock, m_in_sock, m_out_to_in,
m_out_to_in_writable};
+ }
+
+ throw std::runtime_error("Should be unreachable");
+ }
+
+ tcp::socket m_in_sock;
+ tcp::socket m_out_sock;
+
+ bool m_in_to_out_writable{false};
+ bool m_out_to_in_writable{false};
+
+ std::queue<message> m_in_to_out;
+ std::queue<message> m_out_to_in;
+
+ static constexpr size_t BUFF_SIZE = 4096;
+
+ char buf[BUFF_SIZE];
+};
+
+class asio_proxy {
+public:
+ asio_proxy(asio::io_context &io_context, short port)
+ : m_io_context(io_context)
+ , m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port))
+ , m_resolver(m_io_context)
+ , m_in_sock(m_io_context) {
+ do_accept();
+ }
+
+private:
+ void do_accept() {
+ m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) {
+ if (!ec) {
+ auto ses = m_sessons.emplace_back(
+ std::make_shared<session>(std::move(m_in_sock),
tcp::socket{m_io_context}));
+
+ m_resolver.async_resolve(
+ "127.0.0.1", "50900", [ses](asio::error_code ec,
tcp::resolver::results_type endpoints) {
+ if (!ec) {
+ asio::async_connect(ses->get_out_sock(), endpoints,
+ [&ses](const asio::error_code &ec, const
tcp::endpoint & e) {
+ if (!ec) {
+ ses->set_writable(true);
+ ses->start();
+ } else {
+ std::cout << e.port();
+ throw std::runtime_error("Error
connecting to server " + ec.message());
+ }
+ });
+ } else {
+ throw std::runtime_error("Error resolving server's
address " + ec.message());
+ }
+ });
+
+ do_accept();
+
+ } else {
+ throw std::runtime_error("Error accepting incoming connection
" + ec.message());
+ }
+ });
+ }
+
+ asio::io_context &m_io_context;
+ tcp::acceptor m_acceptor;
+ tcp::resolver m_resolver;
+ tcp::socket m_in_sock;
+
+ std::vector<std::shared_ptr<session>> m_sessons;
Review Comment:
Typo in variable name: m_sessons should be m_sessions (missing 'i').
##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
}
}
+
+// TEST_F(connection_test, using_proxy) {
+// fake_server fs{50900, get_logger()};
+// proxy::kgb_proxy proxy{50800, 50900};
+//
+// fs.start();
+// proxy.start();
+//
+// ignite_client_configuration cfg;
+// cfg.set_logger(get_logger());
+// cfg.set_endpoints(get_endpoints());
+//
+// auto cl = ignite_client::start(cfg, 5s);
+//
+// auto cluster_nodes = cl.get_cluster_nodes();
+//
+// ASSERT_EQ(1, cluster_nodes.size());
+// }
+
+TEST_F(connection_test, using_asio) {
+ fake_server fs{50900, get_logger()};
+ fs.start();
+ asio::io_context io_context;
+ proxy::asio_proxy proxy{io_context, static_cast<short>(50800)};
+
+ std::thread t{[&io_context]() {
+ io_context.run();
+ }};
+
+ ignite_client_configuration cfg;
+ cfg.set_logger(get_logger());
+ cfg.set_endpoints(get_endpoints());
+
+ auto cl = ignite_client::start(cfg, 500s);
Review Comment:
The timeout value of 500 seconds seems excessively long for a unit test.
This is inconsistent with other tests in the same file that use 5s timeout
(line 50, 94). Consider using a more reasonable timeout value consistent with
other tests.
```suggestion
auto cl = ignite_client::start(cfg, 5s);
```
##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+ char *m_arr;
+ size_t m_size;
+
+ message(char *arr, size_t size)
+ : m_arr(nullptr)
+ , m_size(size) {
+ m_arr = new char[m_size];
+ std::memcpy(m_arr, arr, size);
+ }
+
+ ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+ session(tcp::socket in_sock, tcp::socket out_sock)
+ : m_in_sock(std::move(in_sock))
+ , m_out_sock(std::move(out_sock)) { }
+
+ void start() { do_serve(); }
+
+ tcp::socket &get_out_sock() { return m_out_sock; }
+
+ void set_writable(bool writable) {
+ m_in_to_out_writable = writable;
+ m_out_to_in_writable = writable;
+ }
+
+ enum direction { forward, reverse };
+
+private:
+ void do_serve() {
+ do_read(forward);
+ do_read(reverse);
+ }
+
+ void do_read(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &src = std::get<0>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+ [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+ if (ec) {
+ throw std::runtime_error("Error while reading from socket " +
ec.message());
+ }
+
+ // we have one-threaded executor no synchronization is needed
+ queue.emplace(buf, len);
+
+ if (writable) { // there are pending write operation on this socket
+ do_write(direction);
+ }
+
+ do_read(direction);
+ });
+ }
+
+ void do_write(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &dst = std::get<1>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ writable = false; // protects from writing same buffer twice (from
head of queue).
+
+ if (!queue.empty()) {
+ message &msg = queue.front();
+
+ asio::async_write(
+ dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue,
direction, &writable](asio::error_code ec, size_t) {
+ if (ec) {
+ throw std::runtime_error("Error while writing to
socket " + ec.message());
+ }
+
+ queue.pop();
+
+ if (!queue.empty()) {
+ // makes writes on the same socket strictly ordered
+ do_write(direction);
+ } else {
+ writable = true; // now read operation can initiate
writes
+ }
+ });
Review Comment:
Similar to the issue in do_read, the lambda captures references (&queue,
&writable) which point to members of 'this'. If the session is destroyed before
the async operation completes, these references will be dangling. The session
should be kept alive by capturing shared_from_this() in the lambda.
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ev{};
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+ while (true) {
+ int in_fd;
+ {
+ sockaddr_in addr{};
+ socklen_t len = sizeof(addr);
+
+ in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+ if (in_fd < 0) {
+ if (errno != EAGAIN) {
+ std::error_code ec{errno, std::system_category()};
+ std::cerr << "Unexpected issue when accepting connection
err=" << ec.message() << "\n";
+ }
+ break;
+ }
+
+ set_socket_non_blocking(in_fd);
+
+ std::cout << "Client connected to proxy fd = " << in_fd <<
std::endl;
+ }
+
+ int out_fd;
+ {
+ out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ set_socket_non_blocking(out_fd);
+
+ if (out_fd < 0) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to create socket for outbound
proxy connection err = " + ec.message());
+ }
+
+ sockaddr_in addr{};
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(out_port);
+
+ inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+ int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+ if (res < 0) {
+ if (errno != EINPROGRESS) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to connection to server
err = " + ec.message());
+ }
+ }
+
+ std::cout << "Proxy connected to server fd = " << out_fd <<
std::endl;
+ }
+
+ std::shared_ptr<proxy_connection> conn =
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+ connections[in_fd] = conn;
+ connections[out_fd] = conn;
+
+ add_socket_to_epoll(in_fd);
+ add_socket_to_epoll(out_fd);
+
+ std::cout << "Socket pair has been created in_fd = " << in_fd << "
out_fd = " << out_fd << std::endl;
+ }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+ int fd = ep_ev.data.fd;
+
+ auto it = connections.find(fd);
+ if (it == connections.end()) {
+ throw std::runtime_error("Event for unknown socket occurred fd = " +
std::to_string(fd));
+ }
+
+ auto conn = it->second;
+
+ if (ep_ev.events & EPOLLIN) {
+
+ char buf[BUFF_SIZE];
+
+ int src = fd;
+ int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ while (true) {
+ ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+ if (received < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("recv");
+ break;
+
+ }
+
+ if (received == 0) {
+ close(src);
+ close(dst);
+
+ connections.erase(src);
+ connections.erase(dst);
+ break;
+ }
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+ queue.emplace(buf, received);
+
+ enable_writable_notification(dst);
+ }
+ }
+
+ if (ep_ev.events & EPOLLOUT) {
+ int dst = fd;
+ int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+
+ while (!queue.empty()) {
+
+ const message_chunk& chunk = queue.front();
+ ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+ if (sent <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("send");
+ break;
+ }
+
+ queue.pop();
+ }
+
+ disable_writable_notification(dst);
+ }
+}
+
+kgb_proxy::~kgb_proxy() {
+ fire_stop_event();
+
+ m_polling_thread->join();
Review Comment:
The destructor assumes m_polling_thread is not null and attempts to join it
without checking. If start() was never called, m_polling_thread will be nullptr
and dereferencing it will cause a crash. Add a null check before calling join().
```suggestion
if (m_polling_thread && m_polling_thread->joinable()) {
m_polling_thread->join();
}
```
##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+ char *m_arr;
+ size_t m_size;
+
+ message(char *arr, size_t size)
+ : m_arr(nullptr)
+ , m_size(size) {
+ m_arr = new char[m_size];
+ std::memcpy(m_arr, arr, size);
+ }
+
+ ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+ session(tcp::socket in_sock, tcp::socket out_sock)
+ : m_in_sock(std::move(in_sock))
+ , m_out_sock(std::move(out_sock)) { }
+
+ void start() { do_serve(); }
+
+ tcp::socket &get_out_sock() { return m_out_sock; }
+
+ void set_writable(bool writable) {
+ m_in_to_out_writable = writable;
+ m_out_to_in_writable = writable;
+ }
+
+ enum direction { forward, reverse };
+
+private:
+ void do_serve() {
+ do_read(forward);
+ do_read(reverse);
+ }
+
+ void do_read(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &src = std::get<0>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+ [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+ if (ec) {
+ throw std::runtime_error("Error while reading from socket " +
ec.message());
+ }
+
+ // we have one-threaded executor no synchronization is needed
+ queue.emplace(buf, len);
+
+ if (writable) { // there are pending write operation on this socket
+ do_write(direction);
+ }
+
+ do_read(direction);
+ });
+ }
+
+ void do_write(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &dst = std::get<1>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ writable = false; // protects from writing same buffer twice (from
head of queue).
+
+ if (!queue.empty()) {
+ message &msg = queue.front();
+
+ asio::async_write(
+ dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue,
direction, &writable](asio::error_code ec, size_t) {
+ if (ec) {
+ throw std::runtime_error("Error while writing to
socket " + ec.message());
+ }
+
+ queue.pop();
+
+ if (!queue.empty()) {
+ // makes writes on the same socket strictly ordered
+ do_write(direction);
+ } else {
+ writable = true; // now read operation can initiate
writes
+ }
+ });
+ }
+ }
+
+ std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&>
get_sockets_and_queue(direction direction) {
+ switch (direction) {
+ case forward:
+ return {m_in_sock, m_out_sock, m_in_to_out,
m_in_to_out_writable};
+ case reverse:
+ return {m_out_sock, m_in_sock, m_out_to_in,
m_out_to_in_writable};
+ }
+
+ throw std::runtime_error("Should be unreachable");
+ }
+
+ tcp::socket m_in_sock;
+ tcp::socket m_out_sock;
+
+ bool m_in_to_out_writable{false};
+ bool m_out_to_in_writable{false};
+
+ std::queue<message> m_in_to_out;
+ std::queue<message> m_out_to_in;
+
+ static constexpr size_t BUFF_SIZE = 4096;
+
+ char buf[BUFF_SIZE];
+};
+
+class asio_proxy {
+public:
+ asio_proxy(asio::io_context &io_context, short port)
+ : m_io_context(io_context)
+ , m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port))
+ , m_resolver(m_io_context)
+ , m_in_sock(m_io_context) {
+ do_accept();
+ }
+
+private:
+ void do_accept() {
+ m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) {
+ if (!ec) {
+ auto ses = m_sessons.emplace_back(
Review Comment:
Typo in variable name: m_sessons should be m_sessions (missing 'i'). This is
the same variable referenced in line 193.
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ev{};
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+ while (true) {
+ int in_fd;
+ {
+ sockaddr_in addr{};
+ socklen_t len = sizeof(addr);
+
+ in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+ if (in_fd < 0) {
+ if (errno != EAGAIN) {
+ std::error_code ec{errno, std::system_category()};
+ std::cerr << "Unexpected issue when accepting connection
err=" << ec.message() << "\n";
+ }
+ break;
+ }
+
+ set_socket_non_blocking(in_fd);
+
+ std::cout << "Client connected to proxy fd = " << in_fd <<
std::endl;
+ }
+
+ int out_fd;
+ {
+ out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ set_socket_non_blocking(out_fd);
+
+ if (out_fd < 0) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to create socket for outbound
proxy connection err = " + ec.message());
+ }
+
+ sockaddr_in addr{};
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(out_port);
+
+ inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+ int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+ if (res < 0) {
+ if (errno != EINPROGRESS) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to connection to server
err = " + ec.message());
+ }
+ }
+
+ std::cout << "Proxy connected to server fd = " << out_fd <<
std::endl;
+ }
+
+ std::shared_ptr<proxy_connection> conn =
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+ connections[in_fd] = conn;
+ connections[out_fd] = conn;
+
+ add_socket_to_epoll(in_fd);
+ add_socket_to_epoll(out_fd);
+
+ std::cout << "Socket pair has been created in_fd = " << in_fd << "
out_fd = " << out_fd << std::endl;
+ }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+ int fd = ep_ev.data.fd;
+
+ auto it = connections.find(fd);
+ if (it == connections.end()) {
+ throw std::runtime_error("Event for unknown socket occurred fd = " +
std::to_string(fd));
+ }
+
+ auto conn = it->second;
+
+ if (ep_ev.events & EPOLLIN) {
+
+ char buf[BUFF_SIZE];
+
+ int src = fd;
+ int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ while (true) {
+ ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+ if (received < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("recv");
+ break;
+
+ }
+
+ if (received == 0) {
+ close(src);
+ close(dst);
+
+ connections.erase(src);
+ connections.erase(dst);
+ break;
+ }
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+ queue.emplace(buf, received);
+
+ enable_writable_notification(dst);
+ }
+ }
+
+ if (ep_ev.events & EPOLLOUT) {
+ int dst = fd;
+ int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+
+ while (!queue.empty()) {
+
+ const message_chunk& chunk = queue.front();
+ ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+ if (sent <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("send");
+ break;
+ }
+
+ queue.pop();
+ }
+
+ disable_writable_notification(dst);
+ }
+}
+
+kgb_proxy::~kgb_proxy() {
+ fire_stop_event();
+
+ m_polling_thread->join();
+
+ close(stop_event_fd);
+ close(server_fd);
+ close(epoll_fd);
+}
+
+void kgb_proxy::start() {
+ epoll_fd = epoll_create1(0);
Review Comment:
The return value of epoll_create1 should be checked for errors. If
epoll_create1 fails, it returns -1, and subsequent operations using epoll_fd
will fail or behave incorrectly.
##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+ char *m_arr;
+ size_t m_size;
+
+ message(char *arr, size_t size)
+ : m_arr(nullptr)
+ , m_size(size) {
+ m_arr = new char[m_size];
+ std::memcpy(m_arr, arr, size);
+ }
+
+ ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+ session(tcp::socket in_sock, tcp::socket out_sock)
+ : m_in_sock(std::move(in_sock))
+ , m_out_sock(std::move(out_sock)) { }
+
+ void start() { do_serve(); }
+
+ tcp::socket &get_out_sock() { return m_out_sock; }
+
+ void set_writable(bool writable) {
+ m_in_to_out_writable = writable;
+ m_out_to_in_writable = writable;
+ }
+
+ enum direction { forward, reverse };
+
+private:
+ void do_serve() {
+ do_read(forward);
+ do_read(reverse);
+ }
+
+ void do_read(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &src = std::get<0>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+ [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+ if (ec) {
+ throw std::runtime_error("Error while reading from socket " +
ec.message());
+ }
+
+ // we have one-threaded executor no synchronization is needed
+ queue.emplace(buf, len);
+
+ if (writable) { // there are pending write operation on this socket
+ do_write(direction);
+ }
+
+ do_read(direction);
+ });
Review Comment:
The lambda captures references to queue and writable (&queue, &writable),
which are local references in the do_read function that point to members of
'this'. If 'this' (the session) is destroyed before the async operation
completes, these references will be dangling. The session should be kept alive
by capturing shared_from_this() in the lambda to ensure the object outlives the
async operation.
##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
}
}
+
+// TEST_F(connection_test, using_proxy) {
+// fake_server fs{50900, get_logger()};
+// proxy::kgb_proxy proxy{50800, 50900};
+//
+// fs.start();
+// proxy.start();
+//
+// ignite_client_configuration cfg;
+// cfg.set_logger(get_logger());
+// cfg.set_endpoints(get_endpoints());
+//
+// auto cl = ignite_client::start(cfg, 5s);
+//
+// auto cluster_nodes = cl.get_cluster_nodes();
+//
+// ASSERT_EQ(1, cluster_nodes.size());
+// }
+
+TEST_F(connection_test, using_asio) {
+ fake_server fs{50900, get_logger()};
+ fs.start();
+ asio::io_context io_context;
+ proxy::asio_proxy proxy{io_context, static_cast<short>(50800)};
+
+ std::thread t{[&io_context]() {
+ io_context.run();
+ }};
Review Comment:
The thread created for running io_context is not properly joined before the
test ends. When the test finishes, the thread may still be running, which can
lead to undefined behavior. The thread should be joined (or detached with
proper cleanup) before the test exits. Consider using a RAII wrapper or
ensuring proper cleanup in a test teardown.
##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
}
}
+
+// TEST_F(connection_test, using_proxy) {
+// fake_server fs{50900, get_logger()};
+// proxy::kgb_proxy proxy{50800, 50900};
+//
+// fs.start();
+// proxy.start();
+//
+// ignite_client_configuration cfg;
+// cfg.set_logger(get_logger());
+// cfg.set_endpoints(get_endpoints());
+//
+// auto cl = ignite_client::start(cfg, 5s);
+//
+// auto cluster_nodes = cl.get_cluster_nodes();
+//
+// ASSERT_EQ(1, cluster_nodes.size());
+// }
+
+TEST_F(connection_test, using_asio) {
+ fake_server fs{50900, get_logger()};
+ fs.start();
+ asio::io_context io_context;
+ proxy::asio_proxy proxy{io_context, static_cast<short>(50800)};
+
+ std::thread t{[&io_context]() {
+ io_context.run();
+ }};
+
+ ignite_client_configuration cfg;
+ cfg.set_logger(get_logger());
+ cfg.set_endpoints(get_endpoints());
+
+ auto cl = ignite_client::start(cfg, 500s);
+
+ auto cluster_nodes = cl.get_cluster_nodes();
+
+ ASSERT_EQ(1, cluster_nodes.size());
Review Comment:
The io_context.run() will continue executing until all work is completed or
the io_context is stopped. When the test ends, there's no mechanism to stop the
io_context, which means the thread will continue running indefinitely. The test
should call io_context.stop() and then join the thread to ensure proper
cleanup. Consider adding a test fixture teardown or ensuring cleanup at the end
of the test.
```suggestion
ASSERT_EQ(1, cluster_nodes.size());
io_context.stop();
t.join();
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#pragma once
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <queue>
+#include <sys/epoll.h>
+#include <thread>
+#include <unordered_map>
+
+namespace ignite::proxy {
+
+struct proxy_connection;
+
+class kgb_proxy {
+public:
+ static constexpr int MAX_EVENTS = 64;
+ static constexpr int BUFF_SIZE = 4096;
+
+ kgb_proxy(int in_port, int out_port)
+ : in_port(in_port)
+ , out_port(out_port) { }
+
+ ~kgb_proxy();
+ void start();
+
+private:
+ void enable_writable_notification(int fd);
+ void disable_writable_notification(int fd);
+ void do_serve();
+ void add_event_fd();
+ void fire_stop_event();
+ void start_server_socket();
+ void add_socket_to_epoll(int fd);
+ void process_incoming_connection();
+ void process_socket_event(const epoll_event &ep_ev);
+ const int in_port;
+ const int out_port;
+ int server_fd{-1};
+ int epoll_fd{-1};
+ int stop_event_fd{-1};
+ std::unique_ptr<std::thread> m_polling_thread{};
+ bool connected{false};
+ std::unordered_map<int, std::shared_ptr<proxy_connection>> connections{};
+};
+
+struct message_chunk {
+ char* m_msg = nullptr;
+ size_t m_size;
+
+ message_chunk(char *msg, size_t size)
+ : m_size(size)
+ {
+ assert(size <= kgb_proxy::BUFF_SIZE);
+
+ m_msg = new char[size];
+
+ std::memcpy(m_msg, msg, size);
+ }
+
+ ~message_chunk() {
+ delete[] m_msg;
+ }
+};
Review Comment:
The message_chunk struct is missing copy constructor and copy assignment
operator, but they should be deleted since the class manages raw memory via
new/delete. Without explicit deletion, the default copy operations will perform
shallow copies, leading to double-free errors. Add: message_chunk(const
message_chunk&) = delete; and message_chunk& operator=(const message_chunk&) =
delete;. Alternatively, implement proper deep copy semantics or use move
semantics only.
##########
modules/platforms/cpp/tests/fake_server/CMakeLists.txt:
##########
@@ -22,6 +22,7 @@ set(SOURCES
fake_server.cpp
tcp_client_channel.cpp
connection_test.cpp
+ proxy/asio_proxy.cpp
Review Comment:
The kgb_proxy.cpp file is not included in the SOURCES list in
CMakeLists.txt, which means it won't be compiled or linked. This makes the
kgb_proxy.h header and kgb_proxy.cpp implementation dead code. Either add
proxy/kgb_proxy.cpp to the SOURCES list if it's intended to be used, or remove
the kgb_proxy files entirely.
```suggestion
proxy/asio_proxy.cpp
proxy/kgb_proxy.cpp
```
##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+ char *m_arr;
+ size_t m_size;
+
+ message(char *arr, size_t size)
+ : m_arr(nullptr)
+ , m_size(size) {
+ m_arr = new char[m_size];
+ std::memcpy(m_arr, arr, size);
+ }
+
+ ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+ session(tcp::socket in_sock, tcp::socket out_sock)
+ : m_in_sock(std::move(in_sock))
+ , m_out_sock(std::move(out_sock)) { }
+
+ void start() { do_serve(); }
+
+ tcp::socket &get_out_sock() { return m_out_sock; }
+
+ void set_writable(bool writable) {
+ m_in_to_out_writable = writable;
+ m_out_to_in_writable = writable;
+ }
+
+ enum direction { forward, reverse };
+
+private:
+ void do_serve() {
+ do_read(forward);
+ do_read(reverse);
+ }
+
+ void do_read(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &src = std::get<0>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+ [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+ if (ec) {
+ throw std::runtime_error("Error while reading from socket " +
ec.message());
+ }
+
+ // we have one-threaded executor no synchronization is needed
+ queue.emplace(buf, len);
+
+ if (writable) { // there are pending write operation on this socket
+ do_write(direction);
+ }
+
+ do_read(direction);
+ });
+ }
+
+ void do_write(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &dst = std::get<1>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ writable = false; // protects from writing same buffer twice (from
head of queue).
+
+ if (!queue.empty()) {
+ message &msg = queue.front();
+
+ asio::async_write(
+ dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue,
direction, &writable](asio::error_code ec, size_t) {
+ if (ec) {
+ throw std::runtime_error("Error while writing to
socket " + ec.message());
+ }
+
+ queue.pop();
+
+ if (!queue.empty()) {
+ // makes writes on the same socket strictly ordered
+ do_write(direction);
+ } else {
+ writable = true; // now read operation can initiate
writes
+ }
+ });
+ }
+ }
+
+ std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&>
get_sockets_and_queue(direction direction) {
+ switch (direction) {
+ case forward:
+ return {m_in_sock, m_out_sock, m_in_to_out,
m_in_to_out_writable};
+ case reverse:
+ return {m_out_sock, m_in_sock, m_out_to_in,
m_out_to_in_writable};
+ }
+
+ throw std::runtime_error("Should be unreachable");
+ }
+
+ tcp::socket m_in_sock;
+ tcp::socket m_out_sock;
+
+ bool m_in_to_out_writable{false};
+ bool m_out_to_in_writable{false};
+
+ std::queue<message> m_in_to_out;
+ std::queue<message> m_out_to_in;
+
+ static constexpr size_t BUFF_SIZE = 4096;
+
+ char buf[BUFF_SIZE];
Review Comment:
The session buffer is reused across multiple async read operations without
proper synchronization. If multiple read operations are in progress (forward
and reverse), they may both write to the same 'buf' array simultaneously,
causing data corruption. Each direction should have its own buffer to prevent
race conditions.
##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+ char *m_arr;
+ size_t m_size;
+
+ message(char *arr, size_t size)
+ : m_arr(nullptr)
+ , m_size(size) {
+ m_arr = new char[m_size];
+ std::memcpy(m_arr, arr, size);
+ }
+
Review Comment:
The message struct is missing copy constructor and copy assignment operator,
but they should be deleted since the class manages raw memory via new/delete.
Without explicit deletion, the default copy operations will perform shallow
copies, leading to double-free errors. Add: message(const message&) = delete;
and message& operator=(const message&) = delete;. Alternatively, implement
proper deep copy semantics or use move semantics only.
```suggestion
message(const message &) = delete;
message &operator=(const message &) = delete;
message(message &&other) noexcept
: m_arr(other.m_arr)
, m_size(other.m_size) {
other.m_arr = nullptr;
other.m_size = 0;
}
message &operator=(message &&other) noexcept {
if (this != &other) {
delete[] m_arr;
m_arr = other.m_arr;
m_size = other.m_size;
other.m_arr = nullptr;
other.m_size = 0;
}
return *this;
}
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ev{};
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+ while (true) {
+ int in_fd;
+ {
+ sockaddr_in addr{};
+ socklen_t len = sizeof(addr);
+
+ in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+ if (in_fd < 0) {
+ if (errno != EAGAIN) {
+ std::error_code ec{errno, std::system_category()};
+ std::cerr << "Unexpected issue when accepting connection
err=" << ec.message() << "\n";
+ }
+ break;
+ }
+
+ set_socket_non_blocking(in_fd);
+
+ std::cout << "Client connected to proxy fd = " << in_fd <<
std::endl;
+ }
+
+ int out_fd;
+ {
+ out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ set_socket_non_blocking(out_fd);
+
+ if (out_fd < 0) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to create socket for outbound
proxy connection err = " + ec.message());
+ }
+
+ sockaddr_in addr{};
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(out_port);
+
+ inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+ int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+ if (res < 0) {
+ if (errno != EINPROGRESS) {
+ std::error_code ec{errno, std::system_category()};
+ throw std::runtime_error("Unable to connection to server
err = " + ec.message());
+ }
+ }
+
+ std::cout << "Proxy connected to server fd = " << out_fd <<
std::endl;
+ }
+
+ std::shared_ptr<proxy_connection> conn =
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+ connections[in_fd] = conn;
+ connections[out_fd] = conn;
+
+ add_socket_to_epoll(in_fd);
+ add_socket_to_epoll(out_fd);
+
+ std::cout << "Socket pair has been created in_fd = " << in_fd << "
out_fd = " << out_fd << std::endl;
+ }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+ int fd = ep_ev.data.fd;
+
+ auto it = connections.find(fd);
+ if (it == connections.end()) {
+ throw std::runtime_error("Event for unknown socket occurred fd = " +
std::to_string(fd));
+ }
+
+ auto conn = it->second;
+
+ if (ep_ev.events & EPOLLIN) {
+
+ char buf[BUFF_SIZE];
+
+ int src = fd;
+ int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ while (true) {
+ ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+ if (received < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("recv");
+ break;
+
+ }
+
+ if (received == 0) {
+ close(src);
+ close(dst);
+
+ connections.erase(src);
+ connections.erase(dst);
+ break;
+ }
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+ queue.emplace(buf, received);
+
+ enable_writable_notification(dst);
+ }
+ }
+
+ if (ep_ev.events & EPOLLOUT) {
+ int dst = fd;
+ int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+ auto& queue = src == conn->in_sock ? conn->in2out_queue :
conn->out2in_queue;
+
+ while (!queue.empty()) {
+
+ const message_chunk& chunk = queue.front();
+ ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+ if (sent <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+
+ perror("send");
+ break;
+ }
+
+ queue.pop();
+ }
+
+ disable_writable_notification(dst);
Review Comment:
The disable_writable_notification is called unconditionally at the end of
EPOLLOUT handling, even when the queue is not empty and more data needs to be
sent. This could prevent the socket from being notified when it becomes
writable again, potentially causing data to remain stuck in the queue. The
notification should only be disabled when the queue becomes empty.
```suggestion
if (queue.empty()) {
disable_writable_notification(dst);
}
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
Review Comment:
The return value of epoll_ctl should be checked for errors. In the existing
codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are
checked and proper error handling is performed. Ignoring errors could lead to
silent failures where events are not properly registered or modified.
##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+ char *m_arr;
+ size_t m_size;
+
+ message(char *arr, size_t size)
+ : m_arr(nullptr)
+ , m_size(size) {
+ m_arr = new char[m_size];
+ std::memcpy(m_arr, arr, size);
+ }
+
+ ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+ session(tcp::socket in_sock, tcp::socket out_sock)
+ : m_in_sock(std::move(in_sock))
+ , m_out_sock(std::move(out_sock)) { }
+
+ void start() { do_serve(); }
+
+ tcp::socket &get_out_sock() { return m_out_sock; }
+
+ void set_writable(bool writable) {
+ m_in_to_out_writable = writable;
+ m_out_to_in_writable = writable;
+ }
+
+ enum direction { forward, reverse };
+
+private:
+ void do_serve() {
+ do_read(forward);
+ do_read(reverse);
+ }
+
+ void do_read(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &src = std::get<0>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+ [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+ if (ec) {
+ throw std::runtime_error("Error while reading from socket " +
ec.message());
+ }
+
+ // we have one-threaded executor no synchronization is needed
+ queue.emplace(buf, len);
+
+ if (writable) { // there are pending write operation on this socket
+ do_write(direction);
+ }
+
+ do_read(direction);
+ });
+ }
+
+ void do_write(direction direction) {
+ auto tup = get_sockets_and_queue(direction);
+ tcp::socket &dst = std::get<1>(tup);
+ std::queue<message> &queue = std::get<2>(tup);
+ bool &writable = std::get<3>(tup);
+
+ writable = false; // protects from writing same buffer twice (from
head of queue).
+
+ if (!queue.empty()) {
+ message &msg = queue.front();
+
+ asio::async_write(
+ dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue,
direction, &writable](asio::error_code ec, size_t) {
+ if (ec) {
+ throw std::runtime_error("Error while writing to
socket " + ec.message());
+ }
+
+ queue.pop();
+
+ if (!queue.empty()) {
+ // makes writes on the same socket strictly ordered
+ do_write(direction);
+ } else {
+ writable = true; // now read operation can initiate
writes
+ }
+ });
+ }
+ }
+
+ std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&>
get_sockets_and_queue(direction direction) {
+ switch (direction) {
+ case forward:
+ return {m_in_sock, m_out_sock, m_in_to_out,
m_in_to_out_writable};
+ case reverse:
+ return {m_out_sock, m_in_sock, m_out_to_in,
m_out_to_in_writable};
+ }
+
+ throw std::runtime_error("Should be unreachable");
+ }
+
+ tcp::socket m_in_sock;
+ tcp::socket m_out_sock;
+
+ bool m_in_to_out_writable{false};
+ bool m_out_to_in_writable{false};
+
+ std::queue<message> m_in_to_out;
+ std::queue<message> m_out_to_in;
+
+ static constexpr size_t BUFF_SIZE = 4096;
+
+ char buf[BUFF_SIZE];
+};
+
+class asio_proxy {
+public:
+ asio_proxy(asio::io_context &io_context, short port)
+ : m_io_context(io_context)
+ , m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port))
+ , m_resolver(m_io_context)
+ , m_in_sock(m_io_context) {
+ do_accept();
+ }
+
+private:
+ void do_accept() {
+ m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) {
+ if (!ec) {
+ auto ses = m_sessons.emplace_back(
+ std::make_shared<session>(std::move(m_in_sock),
tcp::socket{m_io_context}));
+
+ m_resolver.async_resolve(
+ "127.0.0.1", "50900", [ses](asio::error_code ec,
tcp::resolver::results_type endpoints) {
Review Comment:
The hardcoded port "50900" should be parameterized or made configurable. The
asio_proxy constructor accepts a port parameter for the incoming connection but
hardcodes the outgoing server port. This reduces flexibility and makes the
proxy less reusable. Consider adding an out_port parameter to the constructor.
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+ uint64_t one = 1;
+ write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+ server_fd = socket(AF_INET, SOCK_STREAM, 0);
+ set_socket_non_blocking(server_fd);
+
+ int opt = 1;
+ setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sockaddr_in addr{};
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(in_port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+ listen(server_fd, 16);
+
+ epoll_event ev{};
+
+ ev.events = EPOLLIN;
+ ev.data.fd = server_fd;
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ev{};
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
Review Comment:
The return value of epoll_ctl should be checked for errors. In the existing
codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are
checked and proper error handling is performed.
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+ epoll_event events[MAX_EVENTS];
+
+ bool stopped = false;
+ while (!stopped) {
+ int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+ for (int i = 0; i < event_cnt; ++i) {
+ int fd = events[i].data.fd;
+
+ if (fd == stop_event_fd) {
+ uint64_t val;
+ read(stop_event_fd, &val, sizeof(val));
+ stopped = true;
+ } else if (fd == server_fd) {
+ process_incoming_connection();
+ } else {
+ process_socket_event(events[i]);
+ }
+ }
+ }
+}
+
+void kgb_proxy::add_event_fd() {
+ stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+ epoll_event ev{};
+ ev.events = EPOLLIN;
+ ev.data.fd = stop_event_fd;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
Review Comment:
The return values of socket system calls (eventfd, epoll_ctl) should be
checked for errors before using them. Following the pattern in
linux_async_worker_thread.cpp:65-83, these operations can fail and should be
handled with appropriate error checking and cleanup.
##########
modules/platforms/cpp/cmake/dependencies.cmake:
##########
@@ -76,6 +99,7 @@ else()
fetch_dependency(uni-algo
https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz
6e0cce94a6b45ebee7b904316df9f87f)
if (${ENABLE_TESTS})
fetch_dependency(googletest
https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz
c8340a482851ef6a3fe618a082304cfc)
+
add_asio_dependency(https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz
6699ac1dea111c20d024f25e06e573db)
Review Comment:
The add_asio_dependency function is called with two arguments (URL and MD5
hash) but is defined to take no parameters. This will cause a CMake error.
Remove the arguments from the function call or update the function definition
to accept and use these parameters.
##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
}
}
+
+// TEST_F(connection_test, using_proxy) {
+// fake_server fs{50900, get_logger()};
+// proxy::kgb_proxy proxy{50800, 50900};
+//
+// fs.start();
+// proxy.start();
+//
+// ignite_client_configuration cfg;
+// cfg.set_logger(get_logger());
+// cfg.set_endpoints(get_endpoints());
+//
+// auto cl = ignite_client::start(cfg, 5s);
+//
+// auto cluster_nodes = cl.get_cluster_nodes();
+//
+// ASSERT_EQ(1, cluster_nodes.size());
+// }
+
Review Comment:
The kgb_proxy class implementation is added but there is no active test for
it. The test_F(connection_test, using_proxy) is commented out. While asio_proxy
has a working test, leaving kgb_proxy untested could allow bugs to remain
undetected. Either uncomment and fix the test, or remove the kgb_proxy
implementation if it's not intended to be used.
```suggestion
TEST_F(connection_test, using_proxy) {
fake_server fs{50900, get_logger()};
proxy::kgb_proxy proxy{50800, 50900};
fs.start();
proxy.start();
ignite_client_configuration cfg;
cfg.set_logger(get_logger());
cfg.set_endpoints(get_endpoints());
auto cl = ignite_client::start(cfg, 5s);
auto cluster_nodes = cl.get_cluster_nodes();
ASSERT_EQ(1, cluster_nodes.size());
}
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#pragma once
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <queue>
+#include <sys/epoll.h>
Review Comment:
The kgb_proxy class uses Linux-specific headers (sys/epoll.h, sys/eventfd.h)
and system calls (epoll_create1, eventfd, epoll_ctl, epoll_wait) without any
platform abstraction or guards. This code will not compile on non-Linux
platforms (Windows, macOS). Consider adding platform guards or using a
cross-platform abstraction layer, similar to how the codebase has separate
linux/ and macos/ directories for network code.
```suggestion
#include <queue>
#if defined(__linux__)
#include <sys/epoll.h>
#else
struct epoll_event;
#endif
```
##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+ using network::detail::set_non_blocking_mode;
+
+ if (!set_non_blocking_mode(fd, true)) {
+ throw std::runtime_error("Error making socket non-blocking");
+ }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { //
NOLINT(*-make-member-function-const)
+ epoll_event ep_ev{};
+
+ ep_ev.data.fd = fd;
+ ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
Review Comment:
The return value of epoll_ctl should be checked for errors. In the existing
codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are
checked and proper error handling is performed. Ignoring errors could lead to
silent failures where events are not properly registered or modified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]