Copilot commented on code in PR #17035:
URL: https://github.com/apache/iotdb/pull/17035#discussion_r2703125637


##########
iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py:
##########
@@ -339,6 +404,9 @@ def result_set_to_pandas(self):
 
                 result[i].append(data_array)
 
+        return result
+
+    def _build_dataframe(self, result):

Review Comment:
   The private method `_build_dataframe` lacks a docstring. Consider adding 
documentation to explain what this method does: it takes a dictionary mapping 
column indices to data arrays and builds a pandas DataFrame with proper column 
names and types.
   ```suggestion
       def _build_dataframe(self, result):
           """
           Build a pandas DataFrame from column-wise query results.
   
           Parameters
           ----------
           result : dict[int, list]
               Mapping from column index (0-based) to a list of column chunks,
               where each chunk is either a NumPy array or a pandas.Series
               representing a batch of values for that column.
   
           Returns
           -------
           pandas.DataFrame
               A DataFrame where each key in ``result`` becomes a column. 
Columns
               are ordered and renamed according to ``self.__column_name_list``,
               and values are concatenated across chunks with appropriate pandas
               nullable dtypes applied for Int32, Int64, and boolean columns.
           """
   ```



##########
iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py:
##########
@@ -243,11 +246,73 @@ def _has_next_result_set(self):
             return True
         return False
 
+    def _has_buffered_data(self) -> bool:
+        """
+        Check if there is buffered data for streaming DataFrame interface.
+        :return: True if there is buffered data, False otherwise
+        """
+        return self.__df_buffer is not None and len(self.__df_buffer) > 0
+
+    def next_dataframe(self) -> Optional[pd.DataFrame]:
+        """
+        Get the next DataFrame from the result set with exactly fetch_size 
rows.
+        The last DataFrame may have fewer rows.
+        :return: the next DataFrame with fetch_size rows, or None if no more 
data
+        """
+        # Accumulate data until we have at least fetch_size rows or no more 
data
+        while True:
+            buffer_len = 0 if self.__df_buffer is None else 
len(self.__df_buffer)
+            if buffer_len >= self.__fetch_size:
+                # We have enough rows, return a chunk
+                break
+            if not self._has_next_result_set():
+                # No more data to fetch
+                break
+            # Process and accumulate
+            result = self._process_buffer()
+            new_df = self._build_dataframe(result)
+            if self.__df_buffer is None:
+                self.__df_buffer = new_df
+            else:
+                self.__df_buffer = pd.concat(
+                    [self.__df_buffer, new_df], ignore_index=True
+                )
+
+        if self.__df_buffer is None or len(self.__df_buffer) == 0:
+            return None
+
+        if len(self.__df_buffer) <= self.__fetch_size:
+            # Return all remaining rows
+            result_df = self.__df_buffer
+            self.__df_buffer = None
+            return result_df
+        else:
+            # Slice off fetch_size rows
+            result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
+                drop=True
+            )
+            self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size 
:].reset_index(
+                drop=True
+            )
+            return result_df
+
     def result_set_to_pandas(self):
         result = {}
         for i in range(len(self.__column_index_2_tsblock_column_index_list)):
             result[i] = []
         while self._has_next_result_set():
+            batch_result = self._process_buffer()
+            for k, v in batch_result.items():
+                result[k].extend(v)
+
+        return self._build_dataframe(result)
+
+    def _process_buffer(self):

Review Comment:
   The private method `_process_buffer` lacks a docstring. Consider adding 
documentation to explain what this method does: it processes all items in the 
current query result buffer and returns a dictionary mapping column indices to 
lists of data arrays.
   ```suggestion
       def _process_buffer(self):
           """
           Process all remaining serialized query results currently buffered in
           ``self.__query_result`` and organize them by logical column index.
   
           The method iterates over each remaining TSBlock in 
``self.__query_result``,
           deserializes it into a timestamp array, value arrays, and null 
indicators,
           and appends these arrays into a result dictionary keyed by the 
logical
           column index used by this dataset.
   
           When timestamps are not ignored (``self.ignore_timestamp`` is falsy),
           the time column is placed at index ``0`` in the result dictionary. 
For
           each non-negative entry in 
``self.__column_index_2_tsblock_column_index_list``,
           the corresponding value arrays are appended to the list under that
           column index.
   
           As a side effect, this method advances ``self.__query_result_index`` 
and
           clears consumed entries in ``self.__query_result``.
   
           :return: A dictionary mapping logical column indices (int) to lists 
of
                    NumPy arrays containing the batched column data.
           """
   ```



##########
iotdb-client/client-py/tests/integration/test_dataframe.py:
##########
@@ -42,6 +42,56 @@ def test_simple_query():
     assert_array_equal(df.values, [[123.0, 15.0]])
 
 
+def test_stream_query():
+    with IoTDBContainer("iotdb:dev") as db:
+        db: IoTDBContainer
+        session = Session(
+            db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1
+        )
+        session.open(False)
+        session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+        # Write data
+        session.insert_str_record("root.device0", 123, "pressure", "15.0")
+        session.insert_str_record("root.device0", 124, "pressure", "15.0")
+        session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+        # Read
+        session_data_set = session.execute_query_statement("SELECT * FROM 
root.device0")
+        index = 0
+        while session_data_set.has_next_df():
+            df = session_data_set.next_df()
+            assert list(df.columns) == ["Time", "root.device0.pressure"]
+            assert_array_equal(df.values, [[123.0 + index, 15.0]])
+            index += 1
+        session.close()
+        assert index == 3
+
+
+def test_stream_query_with_illegal_fetch_size():
+    with IoTDBContainer("iotdb:dev") as db:
+        db: IoTDBContainer
+        session = Session(
+            db.get_container_host_ip(), db.get_exposed_port(6667), 
fetch_size=-1
+        )
+        session.open(False)
+        session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+        # Write data
+        session.insert_str_record("root.device0", 123, "pressure", "15.0")
+        session.insert_str_record("root.device0", 124, "pressure", "15.0")
+        session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+        # Read
+        session_data_set = session.execute_query_statement("SELECT * FROM 
root.device0")
+
+        while session_data_set.has_next_df():
+            df = session_data_set.next_df()
+            assert list(df.columns) == ["Time", "root.device0.pressure"]
+            assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0], 
[125.0, 15.0]])
+        session.close()

Review Comment:
   The test expects the loop to execute exactly once (since all 3 rows should 
fit in a single DataFrame with default fetch_size of 5000), but there's no 
assertion to verify this behavior. Consider adding an assertion after the loop 
to ensure it executed exactly once, similar to how `test_stream_query` verifies 
that `index == 3`.



##########
iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py:
##########
@@ -243,11 +246,73 @@ def _has_next_result_set(self):
             return True
         return False
 
+    def _has_buffered_data(self) -> bool:
+        """
+        Check if there is buffered data for streaming DataFrame interface.
+        :return: True if there is buffered data, False otherwise
+        """
+        return self.__df_buffer is not None and len(self.__df_buffer) > 0
+
+    def next_dataframe(self) -> Optional[pd.DataFrame]:
+        """
+        Get the next DataFrame from the result set with exactly fetch_size 
rows.
+        The last DataFrame may have fewer rows.
+        :return: the next DataFrame with fetch_size rows, or None if no more 
data
+        """
+        # Accumulate data until we have at least fetch_size rows or no more 
data
+        while True:
+            buffer_len = 0 if self.__df_buffer is None else 
len(self.__df_buffer)
+            if buffer_len >= self.__fetch_size:
+                # We have enough rows, return a chunk
+                break
+            if not self._has_next_result_set():
+                # No more data to fetch
+                break
+            # Process and accumulate
+            result = self._process_buffer()
+            new_df = self._build_dataframe(result)
+            if self.__df_buffer is None:
+                self.__df_buffer = new_df
+            else:
+                self.__df_buffer = pd.concat(
+                    [self.__df_buffer, new_df], ignore_index=True
+                )
+
+        if self.__df_buffer is None or len(self.__df_buffer) == 0:
+            return None
+
+        if len(self.__df_buffer) <= self.__fetch_size:
+            # Return all remaining rows
+            result_df = self.__df_buffer
+            self.__df_buffer = None
+            return result_df
+        else:
+            # Slice off fetch_size rows
+            result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
+                drop=True
+            )
+            self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size 
:].reset_index(
+                drop=True
+            )
+            return result_df

Review Comment:
   The `next_dataframe` method may accumulate more data than necessary in the 
buffer. When the buffer has fewer than `fetch_size` rows, the method fetches an 
entire batch (which could be `fetch_size` rows from the server), processes all 
of it, and concatenates it to the buffer. This could result in the buffer 
having significantly more than `fetch_size` rows after the first iteration. 
   
   For example, if the buffer has 1 row and `fetch_size` is 1000, after one 
iteration it could have 1001 rows, and then only 1000 would be returned, 
leaving 1 row in the buffer. This pattern repeats, causing unnecessary 
concatenation operations. Consider checking if the buffer has enough data 
before processing the entire batch, or process batches in smaller chunks.
   ```suggestion
           # Fast path: buffer already has enough rows to serve a full chunk
           if self.__df_buffer is not None and len(self.__df_buffer) >= 
self.__fetch_size:
               result_df = self.__df_buffer.iloc[: 
self.__fetch_size].reset_index(
                   drop=True
               )
               self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size 
:].reset_index(
                   drop=True
               )
               return result_df
   
           # Start building the current chunk from any existing buffered rows
           current_chunk = None
           if self.__df_buffer is not None and len(self.__df_buffer) > 0:
               current_chunk = self.__df_buffer
           self.__df_buffer = None
   
           # Accumulate data until we reach fetch_size rows or run out of data
           while True:
               current_len = 0 if current_chunk is None else len(current_chunk)
               if current_len >= self.__fetch_size:
                   break
   
               if not self._has_next_result_set():
                   # No more data to fetch
                   break
   
               # Fetch and process next batch
               result = self._process_buffer()
               new_df = self._build_dataframe(result)
   
               if new_df is None or len(new_df) == 0:
                   # Nothing useful in this batch
                   continue
   
               rows_needed = self.__fetch_size - current_len
               if len(new_df) <= rows_needed:
                   # We can consume the entire batch into the current chunk
                   if current_chunk is None:
                       current_chunk = new_df
                   else:
                       current_chunk = pd.concat(
                           [current_chunk, new_df], ignore_index=True
                       )
               else:
                   # Only consume as many rows as needed; buffer the remainder
                   to_use = new_df.iloc[:rows_needed].reset_index(drop=True)
                   remainder = new_df.iloc[rows_needed:].reset_index(drop=True)
   
                   if current_chunk is None:
                       current_chunk = to_use
                   else:
                       current_chunk = pd.concat(
                           [current_chunk, to_use], ignore_index=True
                       )
   
                   # Save the unused rows for the next call
                   self.__df_buffer = remainder
                   break
   
           if current_chunk is None or len(current_chunk) == 0:
               # No data available
               return None
   
           # current_chunk will have at most fetch_size rows here.
           # If it's smaller, this is the last (partial) chunk.
           return current_chunk.reset_index(drop=True)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to