Copilot commented on code in PR #7528:
URL: https://github.com/apache/ignite-3/pull/7528#discussion_r2763532821
##########
modules/platforms/python/cpp_module/node_connection.h:
##########
@@ -59,48 +88,35 @@ class node_connection final {
*
* @return Schema.
*/
- [[nodiscard]] const std::string &get_schema() const { return m_schema; }
+ [[nodiscard]] const std::string &get_schema() const { return
m_configuration.m_schema; }
/**
* Get page size.
*
* @return Page size.
*/
- [[nodiscard]] std::int32_t get_page_size() const { return m_page_size; }
+ [[nodiscard]] std::int32_t get_page_size() const { return
m_configuration.m_page_size; }
/**
* Get timeout.
*
* @return Timeout.
*/
- [[nodiscard]] std::int32_t get_timeout() const { return m_timeout; }
+ [[nodiscard]] std::int32_t get_timeout() const { return
m_configuration.m_timeout; }
/**
* Constructor.
*
- * @param addresses Addresses.
- * @param schema Schema. Can be empty.
- * @param auth_identity Auth identity. Can be empty.
- * @param auth_secret Auth secret. Can be empty.
- * @param page_size Page size.
- * @param timeout Timeout.
- * @param auto_commit Auto commit flag.
- * @param ssl_cfg SSL Configuration.
- */
- node_connection(std::vector<ignite::end_point> addresses, std::string
schema, std::string auth_identity,
- std::string auth_secret, std::int32_t page_size, std::int32_t
timeout, bool auto_commit, ssl_config ssl_cfg)
- : m_addresses(std::move(addresses))
- , m_schema(schema.empty() ? "PUBLIC" : std::move(schema))
- , m_auth_identity(std::move(auth_identity))
- , m_auth_secret(std::move(auth_secret))
- , m_page_size(page_size > 0 ? page_size : 1024)
- , m_timeout(timeout > 0 ? timeout : DEFAULT_TIMEOUT_SECONDS)
- , m_auto_commit(auto_commit)
- , m_ssl_config(std::move(ssl_cfg))
+ * @param cfg Configuration.
+ */
+ node_connection(configuration cfg)
+ : m_configuration(std::move(cfg))
+ , m_auto_commit(cfg.m_auto_commit)
Review Comment:
The constructor initializes m_auto_commit using cfg.m_auto_commit after
std::move(cfg) has been called on line 113. This is undefined behavior because
cfg has been moved from. The m_auto_commit should be initialized using
m_configuration.m_auto_commit instead, or the order of initialization should be
changed to read cfg.m_auto_commit before the move.
```suggestion
, m_auto_commit(m_configuration.m_auto_commit)
```
##########
modules/platforms/python/tests/test_connect.py:
##########
@@ -57,3 +59,53 @@ def test_connection_wrong_arg(address, err_msg):
with pytest.raises(pyignite_dbapi.InterfaceError) as err:
pyignite_dbapi.connect(address=address, timeout=1)
assert err.match(err_msg)
+
+
+def test_execute_update_rowcount(table_name, cursor, drop_table_cleanup):
+ cursor.execute(f'create table {table_name}(id int primary key, data
varchar)')
+ for key in range(10):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ cursor.execute(f"update {table_name} set data='Lorem ipsum' where id > 3")
+ assert cursor.rowcount == 6
+
+
[email protected]("interval", [2.0, 20.0, 0.0001])
+async def test_heartbeat_enabled(interval, table_name, drop_table_cleanup):
+ row_count = 10
+ with pyignite_dbapi.connect(address=server_addresses_basic[0],
heartbeat_interval=interval) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(f'create table {table_name}(id int primary key,
data varchar)')
+ for key in range(row_count):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ data_out = {}
+ for key in range(row_count):
+ cursor.execute(f"select id, data from {table_name} WHERE id =
?", [key])
+ data_out[key] = cursor.fetchone()
+ if len(data_out) == 5:
+ time.sleep(7)
+
+ assert len(data_out) == row_count
+
+
+async def test_heartbeat_disabled(table_name, drop_table_cleanup):
Review Comment:
The test function is declared as async but doesn't use any await statements.
This should be a regular synchronous function (remove the 'async' keyword)
since the test doesn't perform any asynchronous operations.
##########
modules/platforms/python/cpp_module/node_connection.h:
##########
@@ -59,48 +88,35 @@ class node_connection final {
*
* @return Schema.
*/
- [[nodiscard]] const std::string &get_schema() const { return m_schema; }
+ [[nodiscard]] const std::string &get_schema() const { return
m_configuration.m_schema; }
/**
* Get page size.
*
* @return Page size.
*/
- [[nodiscard]] std::int32_t get_page_size() const { return m_page_size; }
+ [[nodiscard]] std::int32_t get_page_size() const { return
m_configuration.m_page_size; }
/**
* Get timeout.
*
* @return Timeout.
*/
- [[nodiscard]] std::int32_t get_timeout() const { return m_timeout; }
+ [[nodiscard]] std::int32_t get_timeout() const { return
m_configuration.m_timeout; }
/**
* Constructor.
*
- * @param addresses Addresses.
- * @param schema Schema. Can be empty.
- * @param auth_identity Auth identity. Can be empty.
- * @param auth_secret Auth secret. Can be empty.
- * @param page_size Page size.
- * @param timeout Timeout.
- * @param auto_commit Auto commit flag.
- * @param ssl_cfg SSL Configuration.
- */
- node_connection(std::vector<ignite::end_point> addresses, std::string
schema, std::string auth_identity,
- std::string auth_secret, std::int32_t page_size, std::int32_t
timeout, bool auto_commit, ssl_config ssl_cfg)
- : m_addresses(std::move(addresses))
- , m_schema(schema.empty() ? "PUBLIC" : std::move(schema))
- , m_auth_identity(std::move(auth_identity))
- , m_auth_secret(std::move(auth_secret))
- , m_page_size(page_size > 0 ? page_size : 1024)
- , m_timeout(timeout > 0 ? timeout : DEFAULT_TIMEOUT_SECONDS)
- , m_auto_commit(auto_commit)
- , m_ssl_config(std::move(ssl_cfg))
+ * @param cfg Configuration.
+ */
+ node_connection(configuration cfg)
+ : m_configuration(std::move(cfg))
+ , m_auto_commit(cfg.m_auto_commit)
+ , m_timer_thread(ignite::detail::thread_timer::start([] (auto&&) { /*
Ignore */ }))
{
std::random_device device;
std::mt19937 generator(device());
- std::uniform_int_distribution<std::uint32_t> distribution(0,
m_addresses.size());
+ std::uniform_int_distribution<std::uint32_t> distribution(0,
m_configuration.m_addresses.size());
Review Comment:
The uniform_int_distribution is initialized with range [0,
m_configuration.m_addresses.size()], which includes the upper bound. This means
it can generate an index equal to the size of the vector, which would be out of
bounds when accessing m_addresses[idx]. The upper bound should be
m_configuration.m_addresses.size() - 1 to generate valid indices.
```suggestion
assert(!m_configuration.m_addresses.empty());
auto max_index =
static_cast<std::uint32_t>(m_configuration.m_addresses.size() - 1);
std::uniform_int_distribution<std::uint32_t> distribution(0,
max_index);
```
##########
modules/platforms/python/tests/test_connect.py:
##########
@@ -57,3 +59,53 @@ def test_connection_wrong_arg(address, err_msg):
with pytest.raises(pyignite_dbapi.InterfaceError) as err:
pyignite_dbapi.connect(address=address, timeout=1)
assert err.match(err_msg)
+
+
+def test_execute_update_rowcount(table_name, cursor, drop_table_cleanup):
+ cursor.execute(f'create table {table_name}(id int primary key, data
varchar)')
+ for key in range(10):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ cursor.execute(f"update {table_name} set data='Lorem ipsum' where id > 3")
+ assert cursor.rowcount == 6
+
+
[email protected]("interval", [2.0, 20.0, 0.0001])
+async def test_heartbeat_enabled(interval, table_name, drop_table_cleanup):
+ row_count = 10
+ with pyignite_dbapi.connect(address=server_addresses_basic[0],
heartbeat_interval=interval) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(f'create table {table_name}(id int primary key,
data varchar)')
+ for key in range(row_count):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ data_out = {}
+ for key in range(row_count):
+ cursor.execute(f"select id, data from {table_name} WHERE id =
?", [key])
+ data_out[key] = cursor.fetchone()
+ if len(data_out) == 5:
+ time.sleep(7)
+
+ assert len(data_out) == row_count
+
+
+async def test_heartbeat_disabled(table_name, drop_table_cleanup):
+ row_count = 10
+ with pyignite_dbapi.connect(address=server_addresses_basic[0],
heartbeat_interval=None) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(f'create table {table_name}(id int primary key,
data varchar)')
+ for key in range(row_count):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ data_out = {}
+ with pytest.raises(pyignite_dbapi.OperationalError) as err:
+ for key in range(row_count):
+ cursor.execute(f"select id, data from {table_name} WHERE
id = ?", [key])
+ data_out[key] = cursor.fetchone()
+ if len(data_out) == 5:
+ time.sleep(7)
+
+ assert err.match("Connection closed by the server")
Review Comment:
The pytest.raises context manager expects an exception to be raised within
its scope, but the assertion on line 111 is placed inside the context manager,
which means it will never execute if an exception is raised. The assertion
should be moved outside the with block to verify the error message after the
exception is caught.
```suggestion
assert err.match("Connection closed by the server")
```
##########
modules/platforms/python/cpp_module/py_connection.cpp:
##########
@@ -194,21 +194,34 @@ int register_py_connection_type(PyObject* mod) {
}
PyObject *make_py_connection(std::vector<ignite::end_point> addresses, const
char* schema, const char* identity,
- const char* secret, int page_size, int timeout, bool autocommit,
ssl_config &&ssl_cfg) {
+ const char* secret, int page_size, int timeout, float heartbeat_interval,
bool autocommit, ssl_config &&ssl_cfg) {
if (addresses.empty()) {
PyErr_SetString(py_get_module_interface_error_class(), "No addresses
provided to connect");
return nullptr;
}
- auto node_connection = std::make_unique<class node_connection>(
- addresses,
- schema ? schema : "",
- identity ? identity : "",
- secret ? secret : "",
- page_size ? page_size : 1024,
- timeout,
- autocommit,
- std::move(ssl_cfg));
+ node_connection::configuration cfg{addresses, autocommit, ssl_cfg};
+
+ if (schema)
+ cfg.m_schema = schema;
+
+ if (identity)
+ cfg.m_auth_configuration.m_identity = identity;
+
+ if (secret)
+ cfg.m_auth_configuration.m_secret = secret;
+
+
+ if (page_size)
+ cfg.m_page_size = page_size;
+
+ if (timeout)
+ cfg.m_timeout = timeout;
+
+ if (heartbeat_interval)
Review Comment:
When heartbeat_interval=None is passed from Python (as in
test_heartbeat_disabled on line 96), PyArg_ParseTupleAndKeywords with format
'd' will convert None to 0.0. The check on line 221 'if (heartbeat_interval)'
evaluates to false for 0.0, which means the configuration will use
DEFAULT_HEARTBEAT_INTERVAL (30 seconds) instead of disabling heartbeats as
intended. To properly support None to disable heartbeats, the code needs to
either use a PyObject* parameter and check for None explicitly, or use a
negative value convention (e.g., -1) to indicate disabled heartbeats.
```suggestion
if (heartbeat_interval >= 0.0f)
```
##########
modules/platforms/python/cpp_module/node_connection.h:
##########
@@ -603,36 +631,55 @@ class node_connection final {
void on_observable_timestamp(std::int64_t timestamp) {
auto expected = m_observable_timestamp.load();
while (expected < timestamp) {
- auto success =
m_observable_timestamp.compare_exchange_weak(expected, timestamp);
- if (success)
+ if (m_observable_timestamp.compare_exchange_weak(expected,
timestamp))
return;
expected = m_observable_timestamp.load();
}
}
- /** Addresses. */
- const std::vector<ignite::end_point> m_addresses;
+ void send_heartbeat() {
+ auto res =
sync_request_nothrow(ignite::protocol::client_operation::HEARTBEAT,
+ [self_weak = weak_from_this()](const auto&) {
+ if (auto self = self_weak.lock()) {
+ self->plan_heartbeat(self->m_heartbeat_interval);
+ }
+ }
Review Comment:
The lambda passed to sync_request_nothrow is intended to write the heartbeat
request payload, but it's incorrectly trying to schedule the next heartbeat.
This scheduling happens synchronously during request construction (in
make_request on line 421), not after the heartbeat response is received. The
next heartbeat should be scheduled after sync_request_nothrow returns, not
within the writer lambda. The writer lambda should either be empty [](const
auto&) {} or write appropriate heartbeat payload data.
##########
modules/platforms/python/cpp_module/py_connection.cpp:
##########
@@ -194,21 +194,34 @@ int register_py_connection_type(PyObject* mod) {
}
PyObject *make_py_connection(std::vector<ignite::end_point> addresses, const
char* schema, const char* identity,
- const char* secret, int page_size, int timeout, bool autocommit,
ssl_config &&ssl_cfg) {
+ const char* secret, int page_size, int timeout, float heartbeat_interval,
bool autocommit, ssl_config &&ssl_cfg) {
if (addresses.empty()) {
PyErr_SetString(py_get_module_interface_error_class(), "No addresses
provided to connect");
return nullptr;
}
- auto node_connection = std::make_unique<class node_connection>(
- addresses,
- schema ? schema : "",
- identity ? identity : "",
- secret ? secret : "",
- page_size ? page_size : 1024,
- timeout,
- autocommit,
- std::move(ssl_cfg));
+ node_connection::configuration cfg{addresses, autocommit, ssl_cfg};
+
+ if (schema)
Review Comment:
The schema is now initialized to an empty string by default (line 70), which
differs from the documented behavior in __init__.py line 688 that states the
default is 'PUBLIC'. The previous implementation explicitly set schema to
'PUBLIC' if it was empty. This behavioral change should either be reverted (set
default to 'PUBLIC'), or the documentation should be updated to reflect the new
default. Additionally, this change should be tested to ensure an empty schema
works correctly with the server.
```suggestion
// Default schema should be "PUBLIC" when not explicitly provided or
empty,
// to match the documented behavior in the Python API.
cfg.m_schema = "PUBLIC";
if (schema && schema[0] != '\0')
```
##########
modules/platforms/python/tests/test_connect.py:
##########
@@ -57,3 +59,53 @@ def test_connection_wrong_arg(address, err_msg):
with pytest.raises(pyignite_dbapi.InterfaceError) as err:
pyignite_dbapi.connect(address=address, timeout=1)
assert err.match(err_msg)
+
+
+def test_execute_update_rowcount(table_name, cursor, drop_table_cleanup):
+ cursor.execute(f'create table {table_name}(id int primary key, data
varchar)')
+ for key in range(10):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ cursor.execute(f"update {table_name} set data='Lorem ipsum' where id > 3")
+ assert cursor.rowcount == 6
+
+
[email protected]("interval", [2.0, 20.0, 0.0001])
+async def test_heartbeat_enabled(interval, table_name, drop_table_cleanup):
Review Comment:
The test function is declared as async but doesn't use any await statements.
This should be a regular synchronous function (remove the 'async' keyword)
since the test doesn't perform any asynchronous operations.
##########
modules/platforms/python/cpp_module/py_connection.cpp:
##########
@@ -194,21 +194,34 @@ int register_py_connection_type(PyObject* mod) {
}
PyObject *make_py_connection(std::vector<ignite::end_point> addresses, const
char* schema, const char* identity,
- const char* secret, int page_size, int timeout, bool autocommit,
ssl_config &&ssl_cfg) {
+ const char* secret, int page_size, int timeout, float heartbeat_interval,
bool autocommit, ssl_config &&ssl_cfg) {
if (addresses.empty()) {
PyErr_SetString(py_get_module_interface_error_class(), "No addresses
provided to connect");
return nullptr;
}
- auto node_connection = std::make_unique<class node_connection>(
- addresses,
- schema ? schema : "",
- identity ? identity : "",
- secret ? secret : "",
- page_size ? page_size : 1024,
- timeout,
- autocommit,
- std::move(ssl_cfg));
+ node_connection::configuration cfg{addresses, autocommit, ssl_cfg};
+
+ if (schema)
+ cfg.m_schema = schema;
+
+ if (identity)
+ cfg.m_auth_configuration.m_identity = identity;
+
+ if (secret)
+ cfg.m_auth_configuration.m_secret = secret;
+
+
+ if (page_size)
+ cfg.m_page_size = page_size;
+
+ if (timeout)
+ cfg.m_timeout = timeout;
+
+ if (heartbeat_interval)
+ cfg.m_heartbeat_interval =
std::chrono::milliseconds(static_cast<int>(std::round(heartbeat_interval *
1000)));
+
+ auto node_connection = std::make_unique<class node_connection>(cfg);
Review Comment:
The node_connection class uses std::enable_shared_from_this and calls
weak_from_this() in the heartbeat methods (lines 642, 667), but the object is
created with std::make_unique and stored as a raw pointer in py_connection.
This means the shared_ptr control block is never created, and weak_from_this()
will return an empty weak_ptr, causing the heartbeat functionality to fail
silently. The object should be created with std::make_shared instead of
std::make_unique, or the heartbeat mechanism needs to be redesigned to not rely
on shared_from_this.
##########
modules/platforms/python/cpp_module/node_connection.h:
##########
@@ -652,6 +699,18 @@ class node_connection final {
/** Observable timestamp. */
std::atomic_int64_t m_observable_timestamp{0};
- /** SSL Configuration. */
- const ssl_config m_ssl_config;
+ /** Heartbeat interval configured by a user. */
+ std::chrono::milliseconds m_config_heartbeat_interval{0};
+
Review Comment:
The member variable m_config_heartbeat_interval is declared but never used
in the code. If it's intended for future use, consider removing it for now to
keep the code clean. The configured heartbeat interval is already available in
m_configuration.m_heartbeat_interval.
```suggestion
```
##########
modules/platforms/python/cpp_module/node_connection.h:
##########
@@ -540,6 +561,13 @@ class node_connection final {
if (response.error) {
throw ignite::ignite_error(ignite::error::code::HANDSHAKE_HEADER,
"Server rejected handshake with error: " + response.error->what_str());
}
+
+ m_heartbeat_interval = ignite::calculate_heartbeat_interval(
+ m_configuration.m_heartbeat_interval,
std::chrono::milliseconds(response.idle_timeout_ms));
+
+ if (m_heartbeat_interval.count()) {
+ plan_heartbeat(m_heartbeat_interval);
Review Comment:
When the connection is first established, m_last_message_ts is
default-initialized to epoch (time_point{}). When the first heartbeat timeout
fires in on_heartbeat_timeout(), it calculates idle_for as (now - epoch), which
will be a very large duration (time since epoch), causing it to immediately
send a heartbeat even though the connection was just established. Consider
initializing m_last_message_ts to the current time after the handshake
completes (around line 569) to prevent sending an unnecessary immediate
heartbeat.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]