Copilot commented on code in PR #7607:
URL: https://github.com/apache/ignite-3/pull/7607#discussion_r2829772091


##########
modules/platforms/cpp/cmake/dependencies.cmake:
##########
@@ -43,6 +43,29 @@ function(fetch_dependency NAME URL MD5)
     endif()
 endfunction()
 
+function(add_asio_dependency)
+    message(STATUS "Download dependency: asio")
+#    FetchContent_Declare(
+#            asio
+#            GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git
+#            GIT_TAG asio-1-36-0
+#    )

Review Comment:
   Commented-out code should be removed rather than left in the codebase. If 
this alternative implementation using GIT_REPOSITORY is needed for reference, 
consider documenting it in a comment or external documentation instead.
   ```suggestion
   
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ev{};
+    ev.events = EPOLLIN | EPOLLET;
+    ev.data.fd = fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+    while (true) {
+        int in_fd;
+        {
+            sockaddr_in addr{};
+            socklen_t len = sizeof(addr);
+
+            in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+            if (in_fd < 0) {
+                if (errno != EAGAIN) {
+                    std::error_code ec{errno, std::system_category()};
+                    std::cerr << "Unexpected issue when accepting connection 
err=" << ec.message() << "\n";
+                }
+                break;
+            }
+
+            set_socket_non_blocking(in_fd);
+
+            std::cout << "Client connected to proxy fd = " << in_fd << 
std::endl;
+        }
+
+        int out_fd;
+        {
+            out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+            set_socket_non_blocking(out_fd);
+
+            if (out_fd < 0) {
+                std::error_code ec{errno, std::system_category()};
+                throw std::runtime_error("Unable to create socket for outbound 
proxy connection err = " + ec.message());
+            }
+
+            sockaddr_in addr{};
+            addr.sin_family = AF_INET;
+            addr.sin_port = htons(out_port);
+
+            inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+            int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+            if (res < 0) {
+                if (errno != EINPROGRESS) {
+                    std::error_code ec{errno, std::system_category()};
+                    throw std::runtime_error("Unable to connection to server 
err = " + ec.message());
+                }
+            }
+
+            std::cout << "Proxy connected to server fd = " << out_fd << 
std::endl;
+        }
+
+        std::shared_ptr<proxy_connection> conn = 
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+        connections[in_fd] = conn;
+        connections[out_fd] = conn;
+
+        add_socket_to_epoll(in_fd);
+        add_socket_to_epoll(out_fd);
+
+        std::cout << "Socket pair has been created in_fd = " << in_fd << " 
out_fd = " << out_fd << std::endl;
+    }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+    int fd = ep_ev.data.fd;
+
+    auto it = connections.find(fd);
+    if (it == connections.end()) {
+        throw std::runtime_error("Event for unknown socket occurred fd = " + 
std::to_string(fd));
+    }
+
+    auto conn = it->second;
+
+    if (ep_ev.events & EPOLLIN) {
+
+        char buf[BUFF_SIZE];
+
+        int src = fd;
+        int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        while (true) {
+            ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+            if (received < 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("recv");
+                break;
+
+            }
+
+            if (received == 0) {
+                close(src);
+                close(dst);
+
+                connections.erase(src);
+                connections.erase(dst);
+                break;
+            }
+
+            auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+            queue.emplace(buf, received);
+
+            enable_writable_notification(dst);
+        }
+    }
+
+    if (ep_ev.events & EPOLLOUT) {
+        int dst = fd;
+        int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+
+        while (!queue.empty()) {
+
+            const message_chunk& chunk = queue.front();
+            ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+            if (sent <= 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("send");
+                break;
+            }
+
+            queue.pop();

Review Comment:
   Potential partial send issue: send() may send fewer bytes than requested. 
The code assumes all bytes are sent in one call (line 249), but send() can 
return a value less than chunk.m_size. This could lead to incomplete message 
transmission. The message_chunk should track how many bytes have been sent and 
retry sending the remaining bytes.



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ev{};
+    ev.events = EPOLLIN | EPOLLET;
+    ev.data.fd = fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+    while (true) {
+        int in_fd;
+        {
+            sockaddr_in addr{};
+            socklen_t len = sizeof(addr);
+
+            in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+            if (in_fd < 0) {
+                if (errno != EAGAIN) {
+                    std::error_code ec{errno, std::system_category()};
+                    std::cerr << "Unexpected issue when accepting connection 
err=" << ec.message() << "\n";
+                }
+                break;
+            }
+
+            set_socket_non_blocking(in_fd);
+
+            std::cout << "Client connected to proxy fd = " << in_fd << 
std::endl;
+        }
+
+        int out_fd;
+        {
+            out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+            set_socket_non_blocking(out_fd);
+
+            if (out_fd < 0) {
+                std::error_code ec{errno, std::system_category()};
+                throw std::runtime_error("Unable to create socket for outbound 
proxy connection err = " + ec.message());
+            }
+
+            sockaddr_in addr{};
+            addr.sin_family = AF_INET;
+            addr.sin_port = htons(out_port);
+
+            inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+            int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+            if (res < 0) {
+                if (errno != EINPROGRESS) {
+                    std::error_code ec{errno, std::system_category()};
+                    throw std::runtime_error("Unable to connection to server 
err = " + ec.message());
+                }
+            }
+
+            std::cout << "Proxy connected to server fd = " << out_fd << 
std::endl;
+        }
+
+        std::shared_ptr<proxy_connection> conn = 
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+        connections[in_fd] = conn;
+        connections[out_fd] = conn;
+
+        add_socket_to_epoll(in_fd);
+        add_socket_to_epoll(out_fd);
+
+        std::cout << "Socket pair has been created in_fd = " << in_fd << " 
out_fd = " << out_fd << std::endl;
+    }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+    int fd = ep_ev.data.fd;
+
+    auto it = connections.find(fd);
+    if (it == connections.end()) {
+        throw std::runtime_error("Event for unknown socket occurred fd = " + 
std::to_string(fd));
+    }
+
+    auto conn = it->second;
+
+    if (ep_ev.events & EPOLLIN) {
+
+        char buf[BUFF_SIZE];
+
+        int src = fd;
+        int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        while (true) {
+            ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+            if (received < 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("recv");
+                break;
+
+            }
+
+            if (received == 0) {
+                close(src);
+                close(dst);
+
+                connections.erase(src);
+                connections.erase(dst);
+                break;

Review Comment:
   After closing sockets and erasing connections, the code breaks out of the 
while loop. However, if there are multiple connections in the connections map 
with the same src socket, this could lead to issues. Additionally, after 
erasing connections and closing sockets, the loop should return or continue 
carefully to avoid using invalidated references. The current implementation may 
access invalid memory after the erase operations.
   ```suggestion
                   return;
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ev{};
+    ev.events = EPOLLIN | EPOLLET;
+    ev.data.fd = fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+    while (true) {
+        int in_fd;
+        {
+            sockaddr_in addr{};
+            socklen_t len = sizeof(addr);
+
+            in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+            if (in_fd < 0) {
+                if (errno != EAGAIN) {
+                    std::error_code ec{errno, std::system_category()};
+                    std::cerr << "Unexpected issue when accepting connection 
err=" << ec.message() << "\n";
+                }
+                break;
+            }
+
+            set_socket_non_blocking(in_fd);
+
+            std::cout << "Client connected to proxy fd = " << in_fd << 
std::endl;
+        }
+
+        int out_fd;
+        {
+            out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+            set_socket_non_blocking(out_fd);
+
+            if (out_fd < 0) {
+                std::error_code ec{errno, std::system_category()};
+                throw std::runtime_error("Unable to create socket for outbound 
proxy connection err = " + ec.message());
+            }
+
+            sockaddr_in addr{};
+            addr.sin_family = AF_INET;
+            addr.sin_port = htons(out_port);
+
+            inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+            int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+            if (res < 0) {
+                if (errno != EINPROGRESS) {
+                    std::error_code ec{errno, std::system_category()};
+                    throw std::runtime_error("Unable to connection to server 
err = " + ec.message());
+                }
+            }
+
+            std::cout << "Proxy connected to server fd = " << out_fd << 
std::endl;
+        }
+
+        std::shared_ptr<proxy_connection> conn = 
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+        connections[in_fd] = conn;
+        connections[out_fd] = conn;
+
+        add_socket_to_epoll(in_fd);
+        add_socket_to_epoll(out_fd);
+
+        std::cout << "Socket pair has been created in_fd = " << in_fd << " 
out_fd = " << out_fd << std::endl;
+    }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+    int fd = ep_ev.data.fd;
+
+    auto it = connections.find(fd);
+    if (it == connections.end()) {
+        throw std::runtime_error("Event for unknown socket occurred fd = " + 
std::to_string(fd));
+    }
+
+    auto conn = it->second;
+
+    if (ep_ev.events & EPOLLIN) {
+
+        char buf[BUFF_SIZE];
+
+        int src = fd;
+        int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        while (true) {
+            ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+            if (received < 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("recv");
+                break;
+
+            }
+
+            if (received == 0) {
+                close(src);
+                close(dst);
+
+                connections.erase(src);
+                connections.erase(dst);
+                break;
+            }
+
+            auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+            queue.emplace(buf, received);
+
+            enable_writable_notification(dst);
+        }
+    }
+
+    if (ep_ev.events & EPOLLOUT) {
+        int dst = fd;
+        int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+
+        while (!queue.empty()) {
+
+            const message_chunk& chunk = queue.front();
+            ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+            if (sent <= 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("send");
+                break;
+            }
+
+            queue.pop();
+        }
+
+        disable_writable_notification(dst);
+    }
+}
+
+kgb_proxy::~kgb_proxy() {
+    fire_stop_event();
+
+    m_polling_thread->join();
+
+    close(stop_event_fd);
+    close(server_fd);
+    close(epoll_fd);

Review Comment:
   File descriptors should only be closed if they are valid (>= 0). If start() 
was never called, these file descriptors will have their initial values of -1, 
and closing them will result in errors. Add checks to ensure file descriptors 
are valid before closing them.
   ```suggestion
       if (stop_event_fd >= 0) {
           close(stop_event_fd);
       }
       if (server_fd >= 0) {
           close(server_fd);
       }
       if (epoll_fd >= 0) {
           close(epoll_fd);
       }
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);

Review Comment:
   The return value of epoll_wait should be checked for errors. If epoll_wait 
returns -1, errno should be checked and an error should be handled 
appropriately. Currently, a negative return value would cause the loop to 
iterate with event_cnt < 0, which is incorrect behavior.



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);

Review Comment:
   The return values of socket, setsockopt, bind, listen, and epoll_ctl should 
be checked for errors. These system calls can fail and should have proper error 
handling. Without checks, failures will go undetected and could cause undefined 
behavior.



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#pragma once
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <queue>
+#include <sys/epoll.h>
+#include <thread>
+#include <unordered_map>
+
+namespace ignite::proxy {
+
+struct proxy_connection;
+
+class kgb_proxy {
+public:
+    static constexpr int MAX_EVENTS = 64;
+    static constexpr int BUFF_SIZE = 4096;
+
+    kgb_proxy(int in_port, int out_port)
+        : in_port(in_port)
+        , out_port(out_port) { }
+
+    ~kgb_proxy();
+    void start();
+
+private:
+    void enable_writable_notification(int fd);
+    void disable_writable_notification(int fd);
+    void do_serve();
+    void add_event_fd();
+    void fire_stop_event();
+    void start_server_socket();
+    void add_socket_to_epoll(int fd);
+    void process_incoming_connection();
+    void process_socket_event(const epoll_event &ep_ev);
+    const int in_port;
+    const int out_port;
+    int server_fd{-1};
+    int epoll_fd{-1};
+    int stop_event_fd{-1};
+    std::unique_ptr<std::thread> m_polling_thread{};
+    bool connected{false};

Review Comment:
   The member variable 'connected' is declared but never used anywhere in the 
class. Consider removing it to reduce clutter, or implement the intended 
functionality if it was meant to track connection state.
   ```suggestion
   
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+    char *m_arr;
+    size_t m_size;
+
+    message(char *arr, size_t size)
+        : m_arr(nullptr)
+        , m_size(size) {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }
+
+    ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock)) { }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &src = std::get<0>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+            if (ec) {
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            // we have one-threaded executor no synchronization is needed
+            queue.emplace(buf, len);
+
+            if (writable) { // there are pending write operation on this socket
+                do_write(direction);
+            }
+
+            do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &dst = std::get<1>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue, 
direction, &writable](asio::error_code ec, size_t) {
+                    if (ec) {
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());
+                    }
+
+                    queue.pop();
+
+                    if (!queue.empty()) {
+                        // makes writes on the same socket strictly ordered
+                        do_write(direction);
+                    } else {
+                        writable = true; // now read operation can initiate 
writes
+                    }
+                });
+        }
+    }
+
+    std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&> 
get_sockets_and_queue(direction direction) {
+        switch (direction) {
+            case forward:
+                return {m_in_sock, m_out_sock, m_in_to_out, 
m_in_to_out_writable};
+            case reverse:
+                return {m_out_sock, m_in_sock, m_out_to_in, 
m_out_to_in_writable};
+        }
+
+        throw std::runtime_error("Should be unreachable");
+    }
+
+    tcp::socket m_in_sock;
+    tcp::socket m_out_sock;
+
+    bool m_in_to_out_writable{false};
+    bool m_out_to_in_writable{false};
+
+    std::queue<message> m_in_to_out;
+    std::queue<message> m_out_to_in;
+
+    static constexpr size_t BUFF_SIZE = 4096;
+
+    char buf[BUFF_SIZE];
+};
+
+class asio_proxy {
+public:
+    asio_proxy(asio::io_context &io_context, short port)
+        : m_io_context(io_context)
+        , m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port))
+        , m_resolver(m_io_context)
+        , m_in_sock(m_io_context) {
+        do_accept();
+    }
+
+private:
+    void do_accept() {
+        m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) {
+            if (!ec) {
+                auto ses = m_sessons.emplace_back(
+                    std::make_shared<session>(std::move(m_in_sock), 
tcp::socket{m_io_context}));
+
+                m_resolver.async_resolve(
+                    "127.0.0.1", "50900", [ses](asio::error_code ec, 
tcp::resolver::results_type endpoints) {
+                        if (!ec) {
+                            asio::async_connect(ses->get_out_sock(), endpoints,
+                                [&ses](const asio::error_code &ec, const 
tcp::endpoint & e) {
+                                    if (!ec) {
+                                        ses->set_writable(true);
+                                        ses->start();
+                                    } else {
+                                        std::cout << e.port();
+                                        throw std::runtime_error("Error 
connecting to server " + ec.message());
+                                    }
+                                });
+                        } else {
+                            throw std::runtime_error("Error resolving server's 
address " + ec.message());
+                        }
+                    });
+
+                do_accept();
+
+            } else {
+                throw std::runtime_error("Error accepting incoming connection 
" + ec.message());
+            }
+        });
+    }
+
+    asio::io_context &m_io_context;
+    tcp::acceptor m_acceptor;
+    tcp::resolver m_resolver;
+    tcp::socket m_in_sock;
+
+    std::vector<std::shared_ptr<session>> m_sessons;

Review Comment:
   Typo in variable name: m_sessons should be m_sessions (missing 'i').



##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
         EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
     }
 }
+
+// TEST_F(connection_test, using_proxy) {
+//     fake_server fs{50900, get_logger()};
+//     proxy::kgb_proxy proxy{50800, 50900};
+//
+//     fs.start();
+//     proxy.start();
+//
+//     ignite_client_configuration cfg;
+//     cfg.set_logger(get_logger());
+//     cfg.set_endpoints(get_endpoints());
+//
+//     auto cl = ignite_client::start(cfg, 5s);
+//
+//     auto cluster_nodes = cl.get_cluster_nodes();
+//
+//     ASSERT_EQ(1, cluster_nodes.size());
+// }
+
+TEST_F(connection_test, using_asio) {
+    fake_server fs{50900, get_logger()};
+    fs.start();
+    asio::io_context io_context;
+    proxy::asio_proxy proxy{io_context, static_cast<short>(50800)};
+
+    std::thread t{[&io_context]() {
+        io_context.run();
+    }};
+
+    ignite_client_configuration cfg;
+    cfg.set_logger(get_logger());
+    cfg.set_endpoints(get_endpoints());
+
+    auto cl = ignite_client::start(cfg, 500s);

Review Comment:
   The timeout value of 500 seconds seems excessively long for a unit test. 
This is inconsistent with other tests in the same file that use 5s timeout 
(line 50, 94). Consider using a more reasonable timeout value consistent with 
other tests.
   ```suggestion
       auto cl = ignite_client::start(cfg, 5s);
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+    char *m_arr;
+    size_t m_size;
+
+    message(char *arr, size_t size)
+        : m_arr(nullptr)
+        , m_size(size) {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }
+
+    ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock)) { }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &src = std::get<0>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+            if (ec) {
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            // we have one-threaded executor no synchronization is needed
+            queue.emplace(buf, len);
+
+            if (writable) { // there are pending write operation on this socket
+                do_write(direction);
+            }
+
+            do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &dst = std::get<1>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue, 
direction, &writable](asio::error_code ec, size_t) {
+                    if (ec) {
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());
+                    }
+
+                    queue.pop();
+
+                    if (!queue.empty()) {
+                        // makes writes on the same socket strictly ordered
+                        do_write(direction);
+                    } else {
+                        writable = true; // now read operation can initiate 
writes
+                    }
+                });

Review Comment:
   Similar to the issue in do_read, the lambda captures references (&queue, 
&writable) which point to members of 'this'. If the session is destroyed before 
the async operation completes, these references will be dangling. The session 
should be kept alive by capturing shared_from_this() in the lambda.



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ev{};
+    ev.events = EPOLLIN | EPOLLET;
+    ev.data.fd = fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+    while (true) {
+        int in_fd;
+        {
+            sockaddr_in addr{};
+            socklen_t len = sizeof(addr);
+
+            in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+            if (in_fd < 0) {
+                if (errno != EAGAIN) {
+                    std::error_code ec{errno, std::system_category()};
+                    std::cerr << "Unexpected issue when accepting connection 
err=" << ec.message() << "\n";
+                }
+                break;
+            }
+
+            set_socket_non_blocking(in_fd);
+
+            std::cout << "Client connected to proxy fd = " << in_fd << 
std::endl;
+        }
+
+        int out_fd;
+        {
+            out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+            set_socket_non_blocking(out_fd);
+
+            if (out_fd < 0) {
+                std::error_code ec{errno, std::system_category()};
+                throw std::runtime_error("Unable to create socket for outbound 
proxy connection err = " + ec.message());
+            }
+
+            sockaddr_in addr{};
+            addr.sin_family = AF_INET;
+            addr.sin_port = htons(out_port);
+
+            inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+            int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+            if (res < 0) {
+                if (errno != EINPROGRESS) {
+                    std::error_code ec{errno, std::system_category()};
+                    throw std::runtime_error("Unable to connection to server 
err = " + ec.message());
+                }
+            }
+
+            std::cout << "Proxy connected to server fd = " << out_fd << 
std::endl;
+        }
+
+        std::shared_ptr<proxy_connection> conn = 
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+        connections[in_fd] = conn;
+        connections[out_fd] = conn;
+
+        add_socket_to_epoll(in_fd);
+        add_socket_to_epoll(out_fd);
+
+        std::cout << "Socket pair has been created in_fd = " << in_fd << " 
out_fd = " << out_fd << std::endl;
+    }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+    int fd = ep_ev.data.fd;
+
+    auto it = connections.find(fd);
+    if (it == connections.end()) {
+        throw std::runtime_error("Event for unknown socket occurred fd = " + 
std::to_string(fd));
+    }
+
+    auto conn = it->second;
+
+    if (ep_ev.events & EPOLLIN) {
+
+        char buf[BUFF_SIZE];
+
+        int src = fd;
+        int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        while (true) {
+            ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+            if (received < 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("recv");
+                break;
+
+            }
+
+            if (received == 0) {
+                close(src);
+                close(dst);
+
+                connections.erase(src);
+                connections.erase(dst);
+                break;
+            }
+
+            auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+            queue.emplace(buf, received);
+
+            enable_writable_notification(dst);
+        }
+    }
+
+    if (ep_ev.events & EPOLLOUT) {
+        int dst = fd;
+        int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+
+        while (!queue.empty()) {
+
+            const message_chunk& chunk = queue.front();
+            ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+            if (sent <= 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("send");
+                break;
+            }
+
+            queue.pop();
+        }
+
+        disable_writable_notification(dst);
+    }
+}
+
+kgb_proxy::~kgb_proxy() {
+    fire_stop_event();
+
+    m_polling_thread->join();

Review Comment:
   The destructor assumes m_polling_thread is not null and attempts to join it 
without checking. If start() was never called, m_polling_thread will be nullptr 
and dereferencing it will cause a crash. Add a null check before calling join().
   ```suggestion
       if (m_polling_thread && m_polling_thread->joinable()) {
           m_polling_thread->join();
       }
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+    char *m_arr;
+    size_t m_size;
+
+    message(char *arr, size_t size)
+        : m_arr(nullptr)
+        , m_size(size) {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }
+
+    ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock)) { }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &src = std::get<0>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+            if (ec) {
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            // we have one-threaded executor no synchronization is needed
+            queue.emplace(buf, len);
+
+            if (writable) { // there are pending write operation on this socket
+                do_write(direction);
+            }
+
+            do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &dst = std::get<1>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue, 
direction, &writable](asio::error_code ec, size_t) {
+                    if (ec) {
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());
+                    }
+
+                    queue.pop();
+
+                    if (!queue.empty()) {
+                        // makes writes on the same socket strictly ordered
+                        do_write(direction);
+                    } else {
+                        writable = true; // now read operation can initiate 
writes
+                    }
+                });
+        }
+    }
+
+    std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&> 
get_sockets_and_queue(direction direction) {
+        switch (direction) {
+            case forward:
+                return {m_in_sock, m_out_sock, m_in_to_out, 
m_in_to_out_writable};
+            case reverse:
+                return {m_out_sock, m_in_sock, m_out_to_in, 
m_out_to_in_writable};
+        }
+
+        throw std::runtime_error("Should be unreachable");
+    }
+
+    tcp::socket m_in_sock;
+    tcp::socket m_out_sock;
+
+    bool m_in_to_out_writable{false};
+    bool m_out_to_in_writable{false};
+
+    std::queue<message> m_in_to_out;
+    std::queue<message> m_out_to_in;
+
+    static constexpr size_t BUFF_SIZE = 4096;
+
+    char buf[BUFF_SIZE];
+};
+
+class asio_proxy {
+public:
+    asio_proxy(asio::io_context &io_context, short port)
+        : m_io_context(io_context)
+        , m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port))
+        , m_resolver(m_io_context)
+        , m_in_sock(m_io_context) {
+        do_accept();
+    }
+
+private:
+    void do_accept() {
+        m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) {
+            if (!ec) {
+                auto ses = m_sessons.emplace_back(

Review Comment:
   Typo in variable name: m_sessons should be m_sessions (missing 'i'). This is 
the same variable referenced in line 193.



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ev{};
+    ev.events = EPOLLIN | EPOLLET;
+    ev.data.fd = fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+    while (true) {
+        int in_fd;
+        {
+            sockaddr_in addr{};
+            socklen_t len = sizeof(addr);
+
+            in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+            if (in_fd < 0) {
+                if (errno != EAGAIN) {
+                    std::error_code ec{errno, std::system_category()};
+                    std::cerr << "Unexpected issue when accepting connection 
err=" << ec.message() << "\n";
+                }
+                break;
+            }
+
+            set_socket_non_blocking(in_fd);
+
+            std::cout << "Client connected to proxy fd = " << in_fd << 
std::endl;
+        }
+
+        int out_fd;
+        {
+            out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+            set_socket_non_blocking(out_fd);
+
+            if (out_fd < 0) {
+                std::error_code ec{errno, std::system_category()};
+                throw std::runtime_error("Unable to create socket for outbound 
proxy connection err = " + ec.message());
+            }
+
+            sockaddr_in addr{};
+            addr.sin_family = AF_INET;
+            addr.sin_port = htons(out_port);
+
+            inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+            int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+            if (res < 0) {
+                if (errno != EINPROGRESS) {
+                    std::error_code ec{errno, std::system_category()};
+                    throw std::runtime_error("Unable to connection to server 
err = " + ec.message());
+                }
+            }
+
+            std::cout << "Proxy connected to server fd = " << out_fd << 
std::endl;
+        }
+
+        std::shared_ptr<proxy_connection> conn = 
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+        connections[in_fd] = conn;
+        connections[out_fd] = conn;
+
+        add_socket_to_epoll(in_fd);
+        add_socket_to_epoll(out_fd);
+
+        std::cout << "Socket pair has been created in_fd = " << in_fd << " 
out_fd = " << out_fd << std::endl;
+    }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+    int fd = ep_ev.data.fd;
+
+    auto it = connections.find(fd);
+    if (it == connections.end()) {
+        throw std::runtime_error("Event for unknown socket occurred fd = " + 
std::to_string(fd));
+    }
+
+    auto conn = it->second;
+
+    if (ep_ev.events & EPOLLIN) {
+
+        char buf[BUFF_SIZE];
+
+        int src = fd;
+        int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        while (true) {
+            ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+            if (received < 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("recv");
+                break;
+
+            }
+
+            if (received == 0) {
+                close(src);
+                close(dst);
+
+                connections.erase(src);
+                connections.erase(dst);
+                break;
+            }
+
+            auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+            queue.emplace(buf, received);
+
+            enable_writable_notification(dst);
+        }
+    }
+
+    if (ep_ev.events & EPOLLOUT) {
+        int dst = fd;
+        int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+
+        while (!queue.empty()) {
+
+            const message_chunk& chunk = queue.front();
+            ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+            if (sent <= 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("send");
+                break;
+            }
+
+            queue.pop();
+        }
+
+        disable_writable_notification(dst);
+    }
+}
+
+kgb_proxy::~kgb_proxy() {
+    fire_stop_event();
+
+    m_polling_thread->join();
+
+    close(stop_event_fd);
+    close(server_fd);
+    close(epoll_fd);
+}
+
+void kgb_proxy::start() {
+    epoll_fd = epoll_create1(0);

Review Comment:
   The return value of epoll_create1 should be checked for errors. If 
epoll_create1 fails, it returns -1, and subsequent operations using epoll_fd 
will fail or behave incorrectly.



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+    char *m_arr;
+    size_t m_size;
+
+    message(char *arr, size_t size)
+        : m_arr(nullptr)
+        , m_size(size) {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }
+
+    ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock)) { }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &src = std::get<0>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+            if (ec) {
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            // we have one-threaded executor no synchronization is needed
+            queue.emplace(buf, len);
+
+            if (writable) { // there are pending write operation on this socket
+                do_write(direction);
+            }
+
+            do_read(direction);
+        });

Review Comment:
   The lambda captures references to queue and writable (&queue, &writable), 
which are local references in the do_read function that point to members of 
'this'. If 'this' (the session) is destroyed before the async operation 
completes, these references will be dangling. The session should be kept alive 
by capturing shared_from_this() in the lambda to ensure the object outlives the 
async operation.



##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
         EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
     }
 }
+
+// TEST_F(connection_test, using_proxy) {
+//     fake_server fs{50900, get_logger()};
+//     proxy::kgb_proxy proxy{50800, 50900};
+//
+//     fs.start();
+//     proxy.start();
+//
+//     ignite_client_configuration cfg;
+//     cfg.set_logger(get_logger());
+//     cfg.set_endpoints(get_endpoints());
+//
+//     auto cl = ignite_client::start(cfg, 5s);
+//
+//     auto cluster_nodes = cl.get_cluster_nodes();
+//
+//     ASSERT_EQ(1, cluster_nodes.size());
+// }
+
+TEST_F(connection_test, using_asio) {
+    fake_server fs{50900, get_logger()};
+    fs.start();
+    asio::io_context io_context;
+    proxy::asio_proxy proxy{io_context, static_cast<short>(50800)};
+
+    std::thread t{[&io_context]() {
+        io_context.run();
+    }};

Review Comment:
   The thread created for running io_context is not properly joined before the 
test ends. When the test finishes, the thread may still be running, which can 
lead to undefined behavior. The thread should be joined (or detached with 
proper cleanup) before the test exits. Consider using a RAII wrapper or 
ensuring proper cleanup in a test teardown.



##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
         EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
     }
 }
+
+// TEST_F(connection_test, using_proxy) {
+//     fake_server fs{50900, get_logger()};
+//     proxy::kgb_proxy proxy{50800, 50900};
+//
+//     fs.start();
+//     proxy.start();
+//
+//     ignite_client_configuration cfg;
+//     cfg.set_logger(get_logger());
+//     cfg.set_endpoints(get_endpoints());
+//
+//     auto cl = ignite_client::start(cfg, 5s);
+//
+//     auto cluster_nodes = cl.get_cluster_nodes();
+//
+//     ASSERT_EQ(1, cluster_nodes.size());
+// }
+
+TEST_F(connection_test, using_asio) {
+    fake_server fs{50900, get_logger()};
+    fs.start();
+    asio::io_context io_context;
+    proxy::asio_proxy proxy{io_context, static_cast<short>(50800)};
+
+    std::thread t{[&io_context]() {
+        io_context.run();
+    }};
+
+    ignite_client_configuration cfg;
+    cfg.set_logger(get_logger());
+    cfg.set_endpoints(get_endpoints());
+
+    auto cl = ignite_client::start(cfg, 500s);
+
+    auto cluster_nodes = cl.get_cluster_nodes();
+
+    ASSERT_EQ(1, cluster_nodes.size());

Review Comment:
   The io_context.run() will continue executing until all work is completed or 
the io_context is stopped. When the test ends, there's no mechanism to stop the 
io_context, which means the thread will continue running indefinitely. The test 
should call io_context.stop() and then join the thread to ensure proper 
cleanup. Consider adding a test fixture teardown or ensuring cleanup at the end 
of the test.
   ```suggestion
       ASSERT_EQ(1, cluster_nodes.size());
   
       io_context.stop();
       t.join();
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#pragma once
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <queue>
+#include <sys/epoll.h>
+#include <thread>
+#include <unordered_map>
+
+namespace ignite::proxy {
+
+struct proxy_connection;
+
+class kgb_proxy {
+public:
+    static constexpr int MAX_EVENTS = 64;
+    static constexpr int BUFF_SIZE = 4096;
+
+    kgb_proxy(int in_port, int out_port)
+        : in_port(in_port)
+        , out_port(out_port) { }
+
+    ~kgb_proxy();
+    void start();
+
+private:
+    void enable_writable_notification(int fd);
+    void disable_writable_notification(int fd);
+    void do_serve();
+    void add_event_fd();
+    void fire_stop_event();
+    void start_server_socket();
+    void add_socket_to_epoll(int fd);
+    void process_incoming_connection();
+    void process_socket_event(const epoll_event &ep_ev);
+    const int in_port;
+    const int out_port;
+    int server_fd{-1};
+    int epoll_fd{-1};
+    int stop_event_fd{-1};
+    std::unique_ptr<std::thread> m_polling_thread{};
+    bool connected{false};
+    std::unordered_map<int, std::shared_ptr<proxy_connection>> connections{};
+};
+
+struct message_chunk {
+    char* m_msg = nullptr;
+    size_t m_size;
+
+    message_chunk(char *msg, size_t size)
+        : m_size(size)
+    {
+        assert(size <= kgb_proxy::BUFF_SIZE);
+
+        m_msg = new char[size];
+
+        std::memcpy(m_msg, msg, size);
+    }
+
+    ~message_chunk() {
+        delete[] m_msg;
+    }
+};

Review Comment:
   The message_chunk struct is missing copy constructor and copy assignment 
operator, but they should be deleted since the class manages raw memory via 
new/delete. Without explicit deletion, the default copy operations will perform 
shallow copies, leading to double-free errors. Add: message_chunk(const 
message_chunk&) = delete; and message_chunk& operator=(const message_chunk&) = 
delete;. Alternatively, implement proper deep copy semantics or use move 
semantics only.



##########
modules/platforms/cpp/tests/fake_server/CMakeLists.txt:
##########
@@ -22,6 +22,7 @@ set(SOURCES
     fake_server.cpp
     tcp_client_channel.cpp
     connection_test.cpp
+    proxy/asio_proxy.cpp

Review Comment:
   The kgb_proxy.cpp file is not included in the SOURCES list in 
CMakeLists.txt, which means it won't be compiled or linked. This makes the 
kgb_proxy.h header and kgb_proxy.cpp implementation dead code. Either add 
proxy/kgb_proxy.cpp to the SOURCES list if it's intended to be used, or remove 
the kgb_proxy files entirely.
   ```suggestion
       proxy/asio_proxy.cpp
       proxy/kgb_proxy.cpp
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+    char *m_arr;
+    size_t m_size;
+
+    message(char *arr, size_t size)
+        : m_arr(nullptr)
+        , m_size(size) {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }
+
+    ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock)) { }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &src = std::get<0>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+            if (ec) {
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            // we have one-threaded executor no synchronization is needed
+            queue.emplace(buf, len);
+
+            if (writable) { // there are pending write operation on this socket
+                do_write(direction);
+            }
+
+            do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &dst = std::get<1>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue, 
direction, &writable](asio::error_code ec, size_t) {
+                    if (ec) {
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());
+                    }
+
+                    queue.pop();
+
+                    if (!queue.empty()) {
+                        // makes writes on the same socket strictly ordered
+                        do_write(direction);
+                    } else {
+                        writable = true; // now read operation can initiate 
writes
+                    }
+                });
+        }
+    }
+
+    std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&> 
get_sockets_and_queue(direction direction) {
+        switch (direction) {
+            case forward:
+                return {m_in_sock, m_out_sock, m_in_to_out, 
m_in_to_out_writable};
+            case reverse:
+                return {m_out_sock, m_in_sock, m_out_to_in, 
m_out_to_in_writable};
+        }
+
+        throw std::runtime_error("Should be unreachable");
+    }
+
+    tcp::socket m_in_sock;
+    tcp::socket m_out_sock;
+
+    bool m_in_to_out_writable{false};
+    bool m_out_to_in_writable{false};
+
+    std::queue<message> m_in_to_out;
+    std::queue<message> m_out_to_in;
+
+    static constexpr size_t BUFF_SIZE = 4096;
+
+    char buf[BUFF_SIZE];

Review Comment:
   The session buffer is reused across multiple async read operations without 
proper synchronization. If multiple read operations are in progress (forward 
and reverse), they may both write to the same 'buf' array simultaneously, 
causing data corruption. Each direction should have its own buffer to prevent 
race conditions.



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+    char *m_arr;
+    size_t m_size;
+
+    message(char *arr, size_t size)
+        : m_arr(nullptr)
+        , m_size(size) {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }
+

Review Comment:
   The message struct is missing copy constructor and copy assignment operator, 
but they should be deleted since the class manages raw memory via new/delete. 
Without explicit deletion, the default copy operations will perform shallow 
copies, leading to double-free errors. Add: message(const message&) = delete; 
and message& operator=(const message&) = delete;. Alternatively, implement 
proper deep copy semantics or use move semantics only.
   ```suggestion
   
       message(const message &) = delete;
       message &operator=(const message &) = delete;
   
       message(message &&other) noexcept
           : m_arr(other.m_arr)
           , m_size(other.m_size) {
           other.m_arr = nullptr;
           other.m_size = 0;
       }
   
       message &operator=(message &&other) noexcept {
           if (this != &other) {
               delete[] m_arr;
               m_arr = other.m_arr;
               m_size = other.m_size;
               other.m_arr = nullptr;
               other.m_size = 0;
           }
           return *this;
       }
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ev{};
+    ev.events = EPOLLIN | EPOLLET;
+    ev.data.fd = fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void kgb_proxy::process_incoming_connection() {
+    while (true) {
+        int in_fd;
+        {
+            sockaddr_in addr{};
+            socklen_t len = sizeof(addr);
+
+            in_fd = accept(server_fd, (sockaddr*) &addr, &len);
+
+            if (in_fd < 0) {
+                if (errno != EAGAIN) {
+                    std::error_code ec{errno, std::system_category()};
+                    std::cerr << "Unexpected issue when accepting connection 
err=" << ec.message() << "\n";
+                }
+                break;
+            }
+
+            set_socket_non_blocking(in_fd);
+
+            std::cout << "Client connected to proxy fd = " << in_fd << 
std::endl;
+        }
+
+        int out_fd;
+        {
+            out_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+            set_socket_non_blocking(out_fd);
+
+            if (out_fd < 0) {
+                std::error_code ec{errno, std::system_category()};
+                throw std::runtime_error("Unable to create socket for outbound 
proxy connection err = " + ec.message());
+            }
+
+            sockaddr_in addr{};
+            addr.sin_family = AF_INET;
+            addr.sin_port = htons(out_port);
+
+            inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+
+            int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr));
+
+            if (res < 0) {
+                if (errno != EINPROGRESS) {
+                    std::error_code ec{errno, std::system_category()};
+                    throw std::runtime_error("Unable to connection to server 
err = " + ec.message());
+                }
+            }
+
+            std::cout << "Proxy connected to server fd = " << out_fd << 
std::endl;
+        }
+
+        std::shared_ptr<proxy_connection> conn = 
std::make_shared<proxy_connection>(in_fd, out_fd);
+
+        connections[in_fd] = conn;
+        connections[out_fd] = conn;
+
+        add_socket_to_epoll(in_fd);
+        add_socket_to_epoll(out_fd);
+
+        std::cout << "Socket pair has been created in_fd = " << in_fd << " 
out_fd = " << out_fd << std::endl;
+    }
+}
+
+void kgb_proxy::process_socket_event(const epoll_event& ep_ev) {
+    int fd = ep_ev.data.fd;
+
+    auto it = connections.find(fd);
+    if (it == connections.end()) {
+        throw std::runtime_error("Event for unknown socket occurred fd = " + 
std::to_string(fd));
+    }
+
+    auto conn = it->second;
+
+    if (ep_ev.events & EPOLLIN) {
+
+        char buf[BUFF_SIZE];
+
+        int src = fd;
+        int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        while (true) {
+            ssize_t received = recv(src, buf, sizeof(buf), 0);
+
+            if (received < 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("recv");
+                break;
+
+            }
+
+            if (received == 0) {
+                close(src);
+                close(dst);
+
+                connections.erase(src);
+                connections.erase(dst);
+                break;
+            }
+
+            auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+            queue.emplace(buf, received);
+
+            enable_writable_notification(dst);
+        }
+    }
+
+    if (ep_ev.events & EPOLLOUT) {
+        int dst = fd;
+        int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock;
+
+        auto& queue = src == conn->in_sock ? conn->in2out_queue : 
conn->out2in_queue;
+
+        while (!queue.empty()) {
+
+            const message_chunk& chunk = queue.front();
+            ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0);
+
+            if (sent <= 0) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    break;
+                }
+
+                perror("send");
+                break;
+            }
+
+            queue.pop();
+        }
+
+        disable_writable_notification(dst);

Review Comment:
   The disable_writable_notification is called unconditionally at the end of 
EPOLLOUT handling, even when the queue is not empty and more data needs to be 
sent. This could prevent the socket from being notified when it becomes 
writable again, potentially causing data to remain stuck in the queue. The 
notification should only be disabled when the queue becomes empty.
   ```suggestion
           if (queue.empty()) {
               disable_writable_notification(dst);
           }
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);

Review Comment:
   The return value of epoll_ctl should be checked for errors. In the existing 
codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are 
checked and proper error handling is performed. Ignoring errors could lead to 
silent failures where events are not properly registered or modified.



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <iostream>
+#include <queue>
+#include <tuple>
+
+#include <asio.hpp>
+#include <asio/ts/buffer.hpp>
+#include <asio/ts/internet.hpp>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct message {
+    char *m_arr;
+    size_t m_size;
+
+    message(char *arr, size_t size)
+        : m_arr(nullptr)
+        , m_size(size) {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }
+
+    ~message() { delete[] m_arr; }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock)) { }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &src = std::get<0>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [this, &queue, direction, &writable](asio::error_code ec, size_t len) {
+            if (ec) {
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            // we have one-threaded executor no synchronization is needed
+            queue.emplace(buf, len);
+
+            if (writable) { // there are pending write operation on this socket
+                do_write(direction);
+            }
+
+            do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        auto tup = get_sockets_and_queue(direction);
+        tcp::socket &dst = std::get<1>(tup);
+        std::queue<message> &queue = std::get<2>(tup);
+        bool &writable = std::get<3>(tup);
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue, 
direction, &writable](asio::error_code ec, size_t) {
+                    if (ec) {
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());
+                    }
+
+                    queue.pop();
+
+                    if (!queue.empty()) {
+                        // makes writes on the same socket strictly ordered
+                        do_write(direction);
+                    } else {
+                        writable = true; // now read operation can initiate 
writes
+                    }
+                });
+        }
+    }
+
+    std::tuple<tcp::socket &, tcp::socket &, std::queue<message> &, bool&> 
get_sockets_and_queue(direction direction) {
+        switch (direction) {
+            case forward:
+                return {m_in_sock, m_out_sock, m_in_to_out, 
m_in_to_out_writable};
+            case reverse:
+                return {m_out_sock, m_in_sock, m_out_to_in, 
m_out_to_in_writable};
+        }
+
+        throw std::runtime_error("Should be unreachable");
+    }
+
+    tcp::socket m_in_sock;
+    tcp::socket m_out_sock;
+
+    bool m_in_to_out_writable{false};
+    bool m_out_to_in_writable{false};
+
+    std::queue<message> m_in_to_out;
+    std::queue<message> m_out_to_in;
+
+    static constexpr size_t BUFF_SIZE = 4096;
+
+    char buf[BUFF_SIZE];
+};
+
+class asio_proxy {
+public:
+    asio_proxy(asio::io_context &io_context, short port)
+        : m_io_context(io_context)
+        , m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port))
+        , m_resolver(m_io_context)
+        , m_in_sock(m_io_context) {
+        do_accept();
+    }
+
+private:
+    void do_accept() {
+        m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) {
+            if (!ec) {
+                auto ses = m_sessons.emplace_back(
+                    std::make_shared<session>(std::move(m_in_sock), 
tcp::socket{m_io_context}));
+
+                m_resolver.async_resolve(
+                    "127.0.0.1", "50900", [ses](asio::error_code ec, 
tcp::resolver::results_type endpoints) {

Review Comment:
   The hardcoded port "50900" should be parameterized or made configurable. The 
asio_proxy constructor accepts a port parameter for the incoming connection but 
hardcodes the outgoing server port. This reduces flexibility and makes the 
proxy less reusable. Consider adding an out_port parameter to the constructor.



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);
+}
+
+void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const)
+    uint64_t one = 1;
+    write(stop_event_fd, &one, sizeof(one));
+}
+
+void kgb_proxy::start_server_socket() {
+    server_fd = socket(AF_INET, SOCK_STREAM, 0);
+    set_socket_non_blocking(server_fd);
+
+    int opt = 1;
+    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+    sockaddr_in addr{};
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(in_port);
+    addr.sin_addr.s_addr = INADDR_ANY;
+
+    bind(server_fd, (sockaddr *) &addr, sizeof(addr));
+    listen(server_fd, 16);
+
+    epoll_event ev{};
+
+    ev.events = EPOLLIN;
+    ev.data.fd = server_fd;
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);
+}
+
+void kgb_proxy::add_socket_to_epoll(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ev{};
+    ev.events = EPOLLIN | EPOLLET;
+    ev.data.fd = fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);

Review Comment:
   The return value of epoll_ctl should be checked for errors. In the existing 
codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are 
checked and proper error handling is performed.



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::disable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);
+}
+
+void kgb_proxy::do_serve() {
+    epoll_event events[MAX_EVENTS];
+
+    bool stopped = false;
+    while (!stopped) {
+        int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+
+        for (int i = 0; i < event_cnt; ++i) {
+            int fd = events[i].data.fd;
+
+            if (fd == stop_event_fd) {
+                uint64_t val;
+                read(stop_event_fd, &val, sizeof(val));
+                stopped = true;
+            } else if (fd == server_fd) {
+                process_incoming_connection();
+            } else {
+                process_socket_event(events[i]);
+            }
+        }
+    }
+}
+
+void kgb_proxy::add_event_fd() {
+    stop_event_fd = eventfd(0, EFD_NONBLOCK);
+
+    epoll_event ev{};
+    ev.events = EPOLLIN;
+    ev.data.fd = stop_event_fd;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev);

Review Comment:
   The return values of socket system calls (eventfd, epoll_ctl) should be 
checked for errors before using them. Following the pattern in 
linux_async_worker_thread.cpp:65-83, these operations can fail and should be 
handled with appropriate error checking and cleanup.



##########
modules/platforms/cpp/cmake/dependencies.cmake:
##########
@@ -76,6 +99,7 @@ else()
     fetch_dependency(uni-algo 
https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz 
6e0cce94a6b45ebee7b904316df9f87f)
     if (${ENABLE_TESTS})
         fetch_dependency(googletest 
https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz 
c8340a482851ef6a3fe618a082304cfc)
+        
add_asio_dependency(https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz
 6699ac1dea111c20d024f25e06e573db)

Review Comment:
   The add_asio_dependency function is called with two arguments (URL and MD5 
hash) but is defined to take no parameters. This will cause a CMake error. 
Remove the arguments from the function call or update the function definition 
to accept and use these parameters.



##########
modules/platforms/cpp/tests/fake_server/connection_test.cpp:
##########
@@ -76,3 +79,42 @@ TEST_F(connection_test, request_timeout) {
         EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
     }
 }
+
+// TEST_F(connection_test, using_proxy) {
+//     fake_server fs{50900, get_logger()};
+//     proxy::kgb_proxy proxy{50800, 50900};
+//
+//     fs.start();
+//     proxy.start();
+//
+//     ignite_client_configuration cfg;
+//     cfg.set_logger(get_logger());
+//     cfg.set_endpoints(get_endpoints());
+//
+//     auto cl = ignite_client::start(cfg, 5s);
+//
+//     auto cluster_nodes = cl.get_cluster_nodes();
+//
+//     ASSERT_EQ(1, cluster_nodes.size());
+// }
+

Review Comment:
   The kgb_proxy class implementation is added but there is no active test for 
it. The test_F(connection_test, using_proxy) is commented out. While asio_proxy 
has a working test, leaving kgb_proxy untested could allow bugs to remain 
undetected. Either uncomment and fix the test, or remove the kgb_proxy 
implementation if it's not intended to be used.
   ```suggestion
   TEST_F(connection_test, using_proxy) {
       fake_server fs{50900, get_logger()};
       proxy::kgb_proxy proxy{50800, 50900};
   
       fs.start();
       proxy.start();
   
       ignite_client_configuration cfg;
       cfg.set_logger(get_logger());
       cfg.set_endpoints(get_endpoints());
   
       auto cl = ignite_client::start(cfg, 5s);
   
       auto cluster_nodes = cl.get_cluster_nodes();
   
       ASSERT_EQ(1, cluster_nodes.size());
   }
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#pragma once
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <queue>
+#include <sys/epoll.h>

Review Comment:
   The kgb_proxy class uses Linux-specific headers (sys/epoll.h, sys/eventfd.h) 
and system calls (epoll_create1, eventfd, epoll_ctl, epoll_wait) without any 
platform abstraction or guards. This code will not compile on non-Linux 
platforms (Windows, macOS). Consider adding platform guards or using a 
cross-platform abstraction layer, similar to how the codebase has separate 
linux/ and macos/ directories for network code.
   ```suggestion
   #include <queue>
   
   #if defined(__linux__)
   #include <sys/epoll.h>
   #else
   struct epoll_event;
   #endif
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#include "kgb_proxy.h"
+
+#include <ignite/network/detail/linux/sockets.h>
+
+#include <fcntl.h>
+#include <iostream>
+#include <cstring>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace ignite::proxy {
+
+void set_socket_non_blocking(int fd) {
+    using network::detail::set_non_blocking_mode;
+
+    if (!set_non_blocking_mode(fd, true)) {
+        throw std::runtime_error("Error making socket non-blocking");
+    }
+}
+
+void kgb_proxy::enable_writable_notification(int fd) { // 
NOLINT(*-make-member-function-const)
+    epoll_event ep_ev{};
+
+    ep_ev.data.fd = fd;
+    ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+
+    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev);

Review Comment:
   The return value of epoll_ctl should be checked for errors. In the existing 
codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are 
checked and proper error handling is performed. Ignoring errors could lead to 
silent failures where events are not properly registered or modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to