Copilot commented on code in PR #17035:
URL: https://github.com/apache/iotdb/pull/17035#discussion_r2703125637
##########
iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py:
##########
@@ -339,6 +404,9 @@ def result_set_to_pandas(self):
result[i].append(data_array)
+ return result
+
+ def _build_dataframe(self, result):
Review Comment:
The private method `_build_dataframe` lacks a docstring. Consider adding
documentation to explain what this method does: it takes a dictionary mapping
column indices to data arrays and builds a pandas DataFrame with proper column
names and types.
```suggestion
def _build_dataframe(self, result):
"""
Build a pandas DataFrame from column-wise query results.
Parameters
----------
result : dict[int, list]
Mapping from column index (0-based) to a list of column chunks,
where each chunk is either a NumPy array or a pandas.Series
representing a batch of values for that column.
Returns
-------
pandas.DataFrame
A DataFrame where each key in ``result`` becomes a column.
Columns
are ordered and renamed according to ``self.__column_name_list``,
and values are concatenated across chunks with appropriate pandas
nullable dtypes applied for Int32, Int64, and boolean columns.
"""
```
##########
iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py:
##########
@@ -243,11 +246,73 @@ def _has_next_result_set(self):
return True
return False
+ def _has_buffered_data(self) -> bool:
+ """
+ Check if there is buffered data for streaming DataFrame interface.
+ :return: True if there is buffered data, False otherwise
+ """
+ return self.__df_buffer is not None and len(self.__df_buffer) > 0
+
+ def next_dataframe(self) -> Optional[pd.DataFrame]:
+ """
+ Get the next DataFrame from the result set with exactly fetch_size
rows.
+ The last DataFrame may have fewer rows.
+ :return: the next DataFrame with fetch_size rows, or None if no more
data
+ """
+ # Accumulate data until we have at least fetch_size rows or no more
data
+ while True:
+ buffer_len = 0 if self.__df_buffer is None else
len(self.__df_buffer)
+ if buffer_len >= self.__fetch_size:
+ # We have enough rows, return a chunk
+ break
+ if not self._has_next_result_set():
+ # No more data to fetch
+ break
+ # Process and accumulate
+ result = self._process_buffer()
+ new_df = self._build_dataframe(result)
+ if self.__df_buffer is None:
+ self.__df_buffer = new_df
+ else:
+ self.__df_buffer = pd.concat(
+ [self.__df_buffer, new_df], ignore_index=True
+ )
+
+ if self.__df_buffer is None or len(self.__df_buffer) == 0:
+ return None
+
+ if len(self.__df_buffer) <= self.__fetch_size:
+ # Return all remaining rows
+ result_df = self.__df_buffer
+ self.__df_buffer = None
+ return result_df
+ else:
+ # Slice off fetch_size rows
+ result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
+ drop=True
+ )
+ self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size
:].reset_index(
+ drop=True
+ )
+ return result_df
+
def result_set_to_pandas(self):
result = {}
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
result[i] = []
while self._has_next_result_set():
+ batch_result = self._process_buffer()
+ for k, v in batch_result.items():
+ result[k].extend(v)
+
+ return self._build_dataframe(result)
+
+ def _process_buffer(self):
Review Comment:
The private method `_process_buffer` lacks a docstring. Consider adding
documentation to explain what this method does: it processes all items in the
current query result buffer and returns a dictionary mapping column indices to
lists of data arrays.
```suggestion
def _process_buffer(self):
"""
Process all remaining serialized query results currently buffered in
``self.__query_result`` and organize them by logical column index.
The method iterates over each remaining TSBlock in
``self.__query_result``,
deserializes it into a timestamp array, value arrays, and null
indicators,
and appends these arrays into a result dictionary keyed by the
logical
column index used by this dataset.
When timestamps are not ignored (``self.ignore_timestamp`` is falsy),
the time column is placed at index ``0`` in the result dictionary.
For
each non-negative entry in
``self.__column_index_2_tsblock_column_index_list``,
the corresponding value arrays are appended to the list under that
column index.
As a side effect, this method advances ``self.__query_result_index``
and
clears consumed entries in ``self.__query_result``.
:return: A dictionary mapping logical column indices (int) to lists
of
NumPy arrays containing the batched column data.
"""
```
##########
iotdb-client/client-py/tests/integration/test_dataframe.py:
##########
@@ -42,6 +42,56 @@ def test_simple_query():
assert_array_equal(df.values, [[123.0, 15.0]])
+def test_stream_query():
+ with IoTDBContainer("iotdb:dev") as db:
+ db: IoTDBContainer
+ session = Session(
+ db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1
+ )
+ session.open(False)
+ session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+ # Write data
+ session.insert_str_record("root.device0", 123, "pressure", "15.0")
+ session.insert_str_record("root.device0", 124, "pressure", "15.0")
+ session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+ # Read
+ session_data_set = session.execute_query_statement("SELECT * FROM
root.device0")
+ index = 0
+ while session_data_set.has_next_df():
+ df = session_data_set.next_df()
+ assert list(df.columns) == ["Time", "root.device0.pressure"]
+ assert_array_equal(df.values, [[123.0 + index, 15.0]])
+ index += 1
+ session.close()
+ assert index == 3
+
+
+def test_stream_query_with_illegal_fetch_size():
+ with IoTDBContainer("iotdb:dev") as db:
+ db: IoTDBContainer
+ session = Session(
+ db.get_container_host_ip(), db.get_exposed_port(6667),
fetch_size=-1
+ )
+ session.open(False)
+ session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+ # Write data
+ session.insert_str_record("root.device0", 123, "pressure", "15.0")
+ session.insert_str_record("root.device0", 124, "pressure", "15.0")
+ session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+ # Read
+ session_data_set = session.execute_query_statement("SELECT * FROM
root.device0")
+
+ while session_data_set.has_next_df():
+ df = session_data_set.next_df()
+ assert list(df.columns) == ["Time", "root.device0.pressure"]
+ assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0],
[125.0, 15.0]])
+ session.close()
Review Comment:
The test expects the loop to execute exactly once (since all 3 rows should
fit in a single DataFrame with default fetch_size of 5000), but there's no
assertion to verify this behavior. Consider adding an assertion after the loop
to ensure it executed exactly once, similar to how `test_stream_query` verifies
that `index == 3`.
##########
iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py:
##########
@@ -243,11 +246,73 @@ def _has_next_result_set(self):
return True
return False
+ def _has_buffered_data(self) -> bool:
+ """
+ Check if there is buffered data for streaming DataFrame interface.
+ :return: True if there is buffered data, False otherwise
+ """
+ return self.__df_buffer is not None and len(self.__df_buffer) > 0
+
+ def next_dataframe(self) -> Optional[pd.DataFrame]:
+ """
+ Get the next DataFrame from the result set with exactly fetch_size
rows.
+ The last DataFrame may have fewer rows.
+ :return: the next DataFrame with fetch_size rows, or None if no more
data
+ """
+ # Accumulate data until we have at least fetch_size rows or no more
data
+ while True:
+ buffer_len = 0 if self.__df_buffer is None else
len(self.__df_buffer)
+ if buffer_len >= self.__fetch_size:
+ # We have enough rows, return a chunk
+ break
+ if not self._has_next_result_set():
+ # No more data to fetch
+ break
+ # Process and accumulate
+ result = self._process_buffer()
+ new_df = self._build_dataframe(result)
+ if self.__df_buffer is None:
+ self.__df_buffer = new_df
+ else:
+ self.__df_buffer = pd.concat(
+ [self.__df_buffer, new_df], ignore_index=True
+ )
+
+ if self.__df_buffer is None or len(self.__df_buffer) == 0:
+ return None
+
+ if len(self.__df_buffer) <= self.__fetch_size:
+ # Return all remaining rows
+ result_df = self.__df_buffer
+ self.__df_buffer = None
+ return result_df
+ else:
+ # Slice off fetch_size rows
+ result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
+ drop=True
+ )
+ self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size
:].reset_index(
+ drop=True
+ )
+ return result_df
Review Comment:
The `next_dataframe` method may accumulate more data than necessary in the
buffer. When the buffer has fewer than `fetch_size` rows, the method fetches an
entire batch (which could be `fetch_size` rows from the server), processes all
of it, and concatenates it to the buffer. This could result in the buffer
having significantly more than `fetch_size` rows after the first iteration.
For example, if the buffer has 1 row and `fetch_size` is 1000, after one
iteration it could have 1001 rows, and then only 1000 would be returned,
leaving 1 row in the buffer. This pattern repeats, causing unnecessary
concatenation operations. Consider checking if the buffer has enough data
before processing the entire batch, or process batches in smaller chunks.
```suggestion
# Fast path: buffer already has enough rows to serve a full chunk
if self.__df_buffer is not None and len(self.__df_buffer) >=
self.__fetch_size:
result_df = self.__df_buffer.iloc[:
self.__fetch_size].reset_index(
drop=True
)
self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size
:].reset_index(
drop=True
)
return result_df
# Start building the current chunk from any existing buffered rows
current_chunk = None
if self.__df_buffer is not None and len(self.__df_buffer) > 0:
current_chunk = self.__df_buffer
self.__df_buffer = None
# Accumulate data until we reach fetch_size rows or run out of data
while True:
current_len = 0 if current_chunk is None else len(current_chunk)
if current_len >= self.__fetch_size:
break
if not self._has_next_result_set():
# No more data to fetch
break
# Fetch and process next batch
result = self._process_buffer()
new_df = self._build_dataframe(result)
if new_df is None or len(new_df) == 0:
# Nothing useful in this batch
continue
rows_needed = self.__fetch_size - current_len
if len(new_df) <= rows_needed:
# We can consume the entire batch into the current chunk
if current_chunk is None:
current_chunk = new_df
else:
current_chunk = pd.concat(
[current_chunk, new_df], ignore_index=True
)
else:
# Only consume as many rows as needed; buffer the remainder
to_use = new_df.iloc[:rows_needed].reset_index(drop=True)
remainder = new_df.iloc[rows_needed:].reset_index(drop=True)
if current_chunk is None:
current_chunk = to_use
else:
current_chunk = pd.concat(
[current_chunk, to_use], ignore_index=True
)
# Save the unused rows for the next call
self.__df_buffer = remainder
break
if current_chunk is None or len(current_chunk) == 0:
# No data available
return None
# current_chunk will have at most fetch_size rows here.
# If it's smaller, this is the last (partial) chunk.
return current_chunk.reset_index(drop=True)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]