Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-04-09 Thread via GitHub


leekeiabstraction merged PR #438:
URL: https://github.com/apache/fluss-rust/pull/438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-04-09 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3060850347


##
bindings/python/test/test_log_table.py:
##
@@ -729,6 +729,382 @@ async def 
test_scan_records_indexing_and_slicing(connection, admin):
 await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_async_iterator(connection, admin):
+"""Test the Python asynchronous iterator loop (`async for`) on 
LogScanner."""
+table_path = fluss.TablePath("fluss", "py_test_async_iterator")
+await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+schema = fluss.Schema(
+pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+)
+await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+table = await connection.get_table(table_path)
+writer = table.new_append().create_writer()
+
+# Write 5 records
+writer.write_arrow_batch(
+pa.RecordBatch.from_arrays(
+[pa.array(list(range(1, 6)), type=pa.int32()),
+ pa.array([f"async{i}" for i in range(1, 6)])],
+schema=pa.schema([pa.field("id", pa.int32()), pa.field("val", 
pa.string())]),
+)
+)
+await writer.flush()
+
+scanner = await table.new_scan().create_log_scanner()
+num_buckets = (await admin.get_table_info(table_path)).num_buckets
+scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+collected = []
+
+# Here is the magical Issue #424 async iterator logic at work:
+async def consume_scanner():
+async for record in scanner:
+collected.append(record)
+if len(collected) == 5:
+break
+
+# We must race the consumption against a timeout so the test doesn't hang 
if the iterator is broken
+await asyncio.wait_for(consume_scanner(), timeout=10.0)
+
+assert len(collected) == 5, f"Expected 5 records, got {len(collected)}"
+
+collected.sort(key=lambda r: r.row["id"])
+for i, record in enumerate(collected):
+assert record.row["id"] == i + 1
+assert record.row["val"] == f"async{i + 1}"
+
+await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_async_iterator_break_no_leak(connection, admin):
+"""Verify that breaking out of `async for` does not leak resources.
+
+After breaking, the scanner must still be usable for synchronous
+`poll()` calls.  If the old implementation's tokio::spawn'd task
+were still alive, it would hold the Mutex and cause `poll()` to
+deadlock or error.
+"""
+table_path = fluss.TablePath("fluss", "py_test_async_break_leak")
+await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+schema = fluss.Schema(
+pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+)
+await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+table = await connection.get_table(table_path)
+writer = table.new_append().create_writer()
+writer.write_arrow_batch(
+pa.RecordBatch.from_arrays(
+[
+pa.array(list(range(1, 11)), type=pa.int32()),
+pa.array([f"v{i}" for i in range(1, 11)]),
+],
+schema=pa.schema(
+[pa.field("id", pa.int32()), pa.field("val", pa.string())]
+),
+)
+)
+await writer.flush()
+
+scanner = await table.new_scan().create_log_scanner()
+num_buckets = (await admin.get_table_info(table_path)).num_buckets
+scanner.subscribe_buckets(
+{i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}
+)
+
+# Phase 1: async for with early break (collect only 3 of 10)
+collected_async = []
+
+async def consume_and_break():
+async for record in scanner:
+collected_async.append(record)
+if len(collected_async) >= 3:
+break
+
+await asyncio.wait_for(consume_and_break(), timeout=10.0)
+assert len(collected_async) == 3, (
+f"Expected 3 records from async for, got {len(collected_async)}"
+)
+
+# Phase 2: sync poll() must still work — proves no leaked task / lock.
+# With small data and few buckets, _async_poll may have fetched all
+# records in one batch. After break, the un-yielded records from that
+# batch are lost. So sync poll may return 0 records — the key assertion
+# is that poll() completes without deadlock (returns within timeout).
+remaining = scanner.poll(2000)
+assert remaining is not None, "poll() should return (not deadlock)"
+
+# If we got records, verify no duplicates
+async_ids = {r.row["id"] for r in collected_async}
+sync_ids = {r.row["id"] for r in remaining}
+assert async_ids.isdisjoint(sync_ids), (
+f"Duplicate IDs between async and sync: {async_ids & sync_ids}"
+)
+
+# All IDs must be from the original 1-10

Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-04-09 Thread via GitHub


leekeiabstraction commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3060786254


##
bindings/python/test/test_log_table.py:
##
@@ -729,6 +729,382 @@ async def 
test_scan_records_indexing_and_slicing(connection, admin):
 await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_async_iterator(connection, admin):
+"""Test the Python asynchronous iterator loop (`async for`) on 
LogScanner."""
+table_path = fluss.TablePath("fluss", "py_test_async_iterator")
+await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+schema = fluss.Schema(
+pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+)
+await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+table = await connection.get_table(table_path)
+writer = table.new_append().create_writer()
+
+# Write 5 records
+writer.write_arrow_batch(
+pa.RecordBatch.from_arrays(
+[pa.array(list(range(1, 6)), type=pa.int32()),
+ pa.array([f"async{i}" for i in range(1, 6)])],
+schema=pa.schema([pa.field("id", pa.int32()), pa.field("val", 
pa.string())]),
+)
+)
+await writer.flush()
+
+scanner = await table.new_scan().create_log_scanner()
+num_buckets = (await admin.get_table_info(table_path)).num_buckets
+scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+collected = []
+
+# Here is the magical Issue #424 async iterator logic at work:
+async def consume_scanner():
+async for record in scanner:
+collected.append(record)
+if len(collected) == 5:
+break
+
+# We must race the consumption against a timeout so the test doesn't hang 
if the iterator is broken
+await asyncio.wait_for(consume_scanner(), timeout=10.0)
+
+assert len(collected) == 5, f"Expected 5 records, got {len(collected)}"
+
+collected.sort(key=lambda r: r.row["id"])
+for i, record in enumerate(collected):
+assert record.row["id"] == i + 1
+assert record.row["val"] == f"async{i + 1}"
+
+await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_async_iterator_break_no_leak(connection, admin):
+"""Verify that breaking out of `async for` does not leak resources.
+
+After breaking, the scanner must still be usable for synchronous
+`poll()` calls.  If the old implementation's tokio::spawn'd task
+were still alive, it would hold the Mutex and cause `poll()` to
+deadlock or error.
+"""
+table_path = fluss.TablePath("fluss", "py_test_async_break_leak")
+await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+schema = fluss.Schema(
+pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+)
+await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+table = await connection.get_table(table_path)
+writer = table.new_append().create_writer()
+writer.write_arrow_batch(
+pa.RecordBatch.from_arrays(
+[
+pa.array(list(range(1, 11)), type=pa.int32()),
+pa.array([f"v{i}" for i in range(1, 11)]),
+],
+schema=pa.schema(
+[pa.field("id", pa.int32()), pa.field("val", pa.string())]
+),
+)
+)
+await writer.flush()
+
+scanner = await table.new_scan().create_log_scanner()
+num_buckets = (await admin.get_table_info(table_path)).num_buckets
+scanner.subscribe_buckets(
+{i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}
+)
+
+# Phase 1: async for with early break (collect only 3 of 10)
+collected_async = []
+
+async def consume_and_break():
+async for record in scanner:
+collected_async.append(record)
+if len(collected_async) >= 3:
+break
+
+await asyncio.wait_for(consume_and_break(), timeout=10.0)
+assert len(collected_async) == 3, (
+f"Expected 3 records from async for, got {len(collected_async)}"
+)
+
+# Phase 2: sync poll() must still work — proves no leaked task / lock.
+# With small data and few buckets, _async_poll may have fetched all
+# records in one batch. After break, the un-yielded records from that
+# batch are lost. So sync poll may return 0 records — the key assertion
+# is that poll() completes without deadlock (returns within timeout).
+remaining = scanner.poll(2000)
+assert remaining is not None, "poll() should return (not deadlock)"
+
+# If we got records, verify no duplicates
+async_ids = {r.row["id"] for r in collected_async}
+sync_ids = {r.row["id"] for r in remaining}
+assert async_ids.isdisjoint(sync_ids), (
+f"Duplicate IDs between async and sync: {async_ids & sync_ids}"
+)
+
+# All IDs must be from the ori

Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-04-05 Thread via GitHub


qzyu999 commented on PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#issuecomment-4189764422

   > TY for the PR. Left a few minor comments, PTAL.
   
   Hi @leekeiabstraction, I updated the code to use a global 
`DEFAULT_POLL_INTERVAL_MS=1000` at the top, rather than have the 
`timeout_ms=1000` as a magic number within the function, where 
`DEFAULT_POLL_INTERVAL_MS` is then referenced within the function. I agree that 
the magic number issue isn't ideal and I believe this is a suitable fix.
   
   In an unrelated change, I noticed that the code was no longer compiling when 
running `uv run maturin develop`, seems like the following code:
   ```python
   #[pyclass]
   pub struct LogScanner {
 ...
 admin: fcore::client::FlussAdmin,
   ```
   requires being updated to
   ```python
   #[pyclass]
   pub struct LogScanner {
 ...
 admin: Arc,
   ```
   in order to compile properly. I believe there were some changes on the 
`main` branch that required this to happen. Looking at the git history, it 
seems that in 3bd6b1b392b767c2760e95db671ef6aa90e3bafa (I saw some later 
changes to this also, but this is the breaking change), the update was made to 
change (in `connection.rs`):
   ```rust
   pub async fn get_admin(&self) -> Result {
   ```
   to
   ```rust
   pub async fn get_admin(&self) -> Result> {
   ```
   
   PTAL and let me know what you think about these solutions, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-04-05 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3037506184


##
bindings/python/src/table.rs:
##
@@ -2198,6 +2197,156 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+// Single lock for the generic async generator
+static ASYNC_GEN_FN: PyOnceLock> = PyOnceLock::new();
+
+let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+# Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+poll_method = getattr(scanner, method_name)
+while True:
+items = await poll_method(timeout_ms)
+if items:
+for item in items:
+yield item
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None).unwrap();
+globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+});
+
+// Determine which internal method to call based on the scanner kind
+let method_name = match slf.kind.as_ref() {
+ScannerKind::Record(_) => "_async_poll",
+ScannerKind::Batch(_) => "_async_poll_batches",
+};
+
+// Instantiate the generator with the scanner instance and the target 
method name
+gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+}
+
+/// Perform a single bounded poll and return a list of ScanRecord objects.
+///
+/// This is the async building block used by `__aiter__` to implement
+/// `async for`. Each call does exactly one network poll (bounded by
+/// `timeout_ms`), converts any results to Python objects, and returns
+/// them as a list. An empty list signals a timeout (no data yet), not
+/// end-of-stream.
+///
+/// Args:
+/// timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+///
+/// Returns:
+/// Awaitable that resolves to a list of ScanRecord objects
+fn _async_poll<'py>(
+&self,
+py: Python<'py>,
+timeout_ms: Option,

Review Comment:
   Hi @leekeiabstraction, in d52113357d272192d8262f1de2b25006d50fb1ac I made 
this into a global variable, `DEFAULT_POLL_INTERVAL_MS` that is initialized (to 
1000) at the top of the script. It's then later referenced within the 
`_async_poll` and `_async_poll_batches` functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-04-05 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3037506500


##
bindings/python/src/table.rs:
##
@@ -2198,6 +2197,156 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+// Single lock for the generic async generator
+static ASYNC_GEN_FN: PyOnceLock> = PyOnceLock::new();
+
+let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+# Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+poll_method = getattr(scanner, method_name)
+while True:
+items = await poll_method(timeout_ms)
+if items:
+for item in items:
+yield item
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None).unwrap();
+globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+});
+
+// Determine which internal method to call based on the scanner kind
+let method_name = match slf.kind.as_ref() {
+ScannerKind::Record(_) => "_async_poll",
+ScannerKind::Batch(_) => "_async_poll_batches",
+};
+
+// Instantiate the generator with the scanner instance and the target 
method name
+gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+}
+
+/// Perform a single bounded poll and return a list of ScanRecord objects.
+///
+/// This is the async building block used by `__aiter__` to implement
+/// `async for`. Each call does exactly one network poll (bounded by
+/// `timeout_ms`), converts any results to Python objects, and returns
+/// them as a list. An empty list signals a timeout (no data yet), not
+/// end-of-stream.
+///
+/// Args:
+/// timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+///
+/// Returns:
+/// Awaitable that resolves to a list of ScanRecord objects
+fn _async_poll<'py>(
+&self,
+py: Python<'py>,
+timeout_ms: Option,
+) -> PyResult> {
+let timeout_ms = timeout_ms.unwrap_or(1000);

Review Comment:
   Hi @leekeiabstraction, this is mentioned above.



##
bindings/python/src/table.rs:
##
@@ -2198,6 +2197,156 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+// Single lock for the generic async generator
+static ASYNC_GEN_FN: PyOnceLock> = PyOnceLock::new();
+
+let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+# Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+poll_method = getattr(scanner, method_name)
+while True:
+items = await poll_method(timeout_ms)
+if items:
+for item in items:
+yield item
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None).unwrap();
+globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+});
+
+// Determine which internal method to call based on the scanner kind
+let method_name = match slf.kind.as_ref() {
+ScannerKind::Record(_) => "_async_poll",
+ScannerKind::Batch(_) => "_async_poll_batches",
+};
+
+// Instantiate the generator with the scanner instance and the target 
method name
+gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+}
+
+/// Perform a single bounded poll and return a list of ScanRecord objects.
+///
+/// This is the async building block used by `__aiter__` to implement
+/// `async for`. Each call does exactly one network poll (bounded by
+/// `timeout_ms`), converts any results to Python objects, and returns
+/// them as a list. An empty list signals a timeout (no data yet), not
+/// end-of-stream.
+///
+/// Args:
+/// timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+///
+/// Returns:
+/// Awaitable that resolves to a list of ScanRecord objects
+fn _async_poll<'py>(
+&self,
+py: Python<'py>,
+timeout_ms: Option,
+) -> PyResult> {
+let timeout_ms = timeout_ms.unwrap_or(1000);
+if timeout_ms < 0 {
+return Err(FlussError::new_err(format!(
+"timeout_ms must be non-negative, got: {timeout_ms}"
+)));
+}
+
+let scanner = Arc::clone(&self.kind);
+let projected_row_type = self.projected_row_typ

Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-04-05 Thread via GitHub


leekeiabstraction commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3037226100


##
bindings/python/src/table.rs:
##
@@ -2198,6 +2197,156 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+// Single lock for the generic async generator
+static ASYNC_GEN_FN: PyOnceLock> = PyOnceLock::new();
+
+let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+# Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+poll_method = getattr(scanner, method_name)
+while True:
+items = await poll_method(timeout_ms)
+if items:
+for item in items:
+yield item
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None).unwrap();
+globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+});
+
+// Determine which internal method to call based on the scanner kind
+let method_name = match slf.kind.as_ref() {
+ScannerKind::Record(_) => "_async_poll",
+ScannerKind::Batch(_) => "_async_poll_batches",
+};
+
+// Instantiate the generator with the scanner instance and the target 
method name
+gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+}
+
+/// Perform a single bounded poll and return a list of ScanRecord objects.
+///
+/// This is the async building block used by `__aiter__` to implement
+/// `async for`. Each call does exactly one network poll (bounded by
+/// `timeout_ms`), converts any results to Python objects, and returns
+/// them as a list. An empty list signals a timeout (no data yet), not
+/// end-of-stream.
+///
+/// Args:
+/// timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+///
+/// Returns:
+/// Awaitable that resolves to a list of ScanRecord objects
+fn _async_poll<'py>(
+&self,
+py: Python<'py>,
+timeout_ms: Option,

Review Comment:
   It seems like we hardcode this anyway in generated code, do we need to 
define an arg for this? 
   
   Also nit: poll_interval_ms is more accurate



##
bindings/python/src/table.rs:
##
@@ -2198,6 +2197,156 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+// Single lock for the generic async generator
+static ASYNC_GEN_FN: PyOnceLock> = PyOnceLock::new();
+
+let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+# Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+poll_method = getattr(scanner, method_name)
+while True:
+items = await poll_method(timeout_ms)
+if items:
+for item in items:
+yield item
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None).unwrap();
+globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+});
+
+// Determine which internal method to call based on the scanner kind
+let method_name = match slf.kind.as_ref() {
+ScannerKind::Record(_) => "_async_poll",
+ScannerKind::Batch(_) => "_async_poll_batches",
+};
+
+// Instantiate the generator with the scanner instance and the target 
method name
+gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+}
+
+/// Perform a single bounded poll and return a list of ScanRecord objects.
+///
+/// This is the async building block used by `__aiter__` to implement
+/// `async for`. Each call does exactly one network poll (bounded by
+/// `timeout_ms`), converts any results to Python objects, and returns
+/// them as a list. An empty list signals a timeout (no data yet), not
+/// end-of-stream.
+///
+/// Args:
+/// timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+///
+/// Returns:
+/// Awaitable that resolves to a list of ScanRecord objects
+fn _async_poll<'py>(
+&self,
+py: Python<'py>,
+timeout_ms: Option,
+) -> PyResult> {
+let timeout_ms = timeout_ms.unwrap_or(1000);

Review Comment:
   nit: magic number



##
bindings/python/src/table.rs:
##
@@ -2198,6 +2197,156 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+

Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-16 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2944095501


##
bindings/python/src/table.rs:
##
@@ -2199,6 +2200,171 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+match slf.kind.as_ref() {
+ScannerKind::Record(_) => {
+static RECORD_ASYNC_GEN_FN: PyOnceLock> = 
PyOnceLock::new();
+let gen_fn = RECORD_ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_scan(scanner, timeout_ms=1000):
+while True:
+batch = await scanner._async_poll(timeout_ms)
+if batch:
+for record in batch:
+yield record
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None).unwrap();
+globals.get_item("_async_scan").unwrap().unwrap().unbind()
+});
+gen_fn.bind(py).call1((slf.into_bound_py_any(py)?,))
+}
+ScannerKind::Batch(_) => {
+static BATCH_ASYNC_GEN_FN: PyOnceLock> = 
PyOnceLock::new();
+let gen_fn = BATCH_ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_batch_scan(scanner, timeout_ms=1000):

Review Comment:
   Hi @fresh-borzoni, made the changes in 
3981fff33714680629fed5503a165d9589c007fe, tested locally and passing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-16 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2944002353


##
bindings/python/src/table.rs:
##
@@ -2167,13 +2165,16 @@ impl LogScanner {
 /// Returns:
 /// PyArrow Table containing all data from subscribed buckets
 fn to_arrow(&self, py: Python) -> PyResult> {
-let scanner = self.scanner.as_batch()?;
-let subscribed = scanner.get_subscribed_buckets();
-if subscribed.is_empty() {
-return Err(FlussError::new_err(
-"No buckets subscribed. Call subscribe(), subscribe_buckets(), 
subscribe_partition(), or subscribe_partition_buckets() first.",
-));
-}
+let subscribed = {
+let scanner = self.kind.as_batch()?;
+let subs = scanner.get_subscribed_buckets();
+if subs.is_empty() {
+return Err(FlussError::new_err(
+"No buckets subscribed. Call subscribe(), 
subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() 
first.",
+));
+}
+subs.clone()

Review Comment:
   Hi @fresh-borzoni, made the changes in 
6ad8cab9171da38e07df03bf0cd7197d387bb93b, tested locally and passing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-16 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2943672192


##
bindings/python/src/table.rs:
##
@@ -2199,6 +2200,171 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {

Review Comment:
   Hi @fresh-borzoni, thanks for the clarification, removed those two entries 
in the `.pyi` file in db23dd6833eef59e21e258bd80575c6d1c419a0a.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-16 Thread via GitHub


fresh-borzoni commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2943512062


##
bindings/python/src/table.rs:
##
@@ -2167,13 +2165,16 @@ impl LogScanner {
 /// Returns:
 /// PyArrow Table containing all data from subscribed buckets
 fn to_arrow(&self, py: Python) -> PyResult> {
-let scanner = self.scanner.as_batch()?;
-let subscribed = scanner.get_subscribed_buckets();
-if subscribed.is_empty() {
-return Err(FlussError::new_err(
-"No buckets subscribed. Call subscribe(), subscribe_buckets(), 
subscribe_partition(), or subscribe_partition_buckets() first.",
-));
-}
+let subscribed = {
+let scanner = self.kind.as_batch()?;
+let subs = scanner.get_subscribed_buckets();
+if subs.is_empty() {
+return Err(FlussError::new_err(
+"No buckets subscribed. Call subscribe(), 
subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() 
first.",
+));
+}
+subs.clone()

Review Comment:
   nit: scoping block + subs.clone() was needed with the Mutex, not needed with 
Arc - all borrows are shared now



##
bindings/python/src/table.rs:
##
@@ -2199,6 +2200,171 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {

Review Comment:
   I think it's better to leave `_async_poll` and `_async_poll_batches` out of 
`.pyi` bc these methods ideally should be private implementation details.
   So exposing `__aiter__` makes sense to just signal IDE that we support 
`async for`, but the rest of underscore methods added - we don't want to 
encourage users to use them directly



##
bindings/python/src/table.rs:
##
@@ -2199,6 +2200,171 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+
+match slf.kind.as_ref() {
+ScannerKind::Record(_) => {
+static RECORD_ASYNC_GEN_FN: PyOnceLock> = 
PyOnceLock::new();
+let gen_fn = RECORD_ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_scan(scanner, timeout_ms=1000):
+while True:
+batch = await scanner._async_poll(timeout_ms)
+if batch:
+for record in batch:
+yield record
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None).unwrap();
+globals.get_item("_async_scan").unwrap().unwrap().unbind()
+});
+gen_fn.bind(py).call1((slf.into_bound_py_any(py)?,))
+}
+ScannerKind::Batch(_) => {
+static BATCH_ASYNC_GEN_FN: PyOnceLock> = 
PyOnceLock::new();
+let gen_fn = BATCH_ASYNC_GEN_FN.get_or_init(py, || {
+let code = pyo3::ffi::c_str!(
+r#"
+async def _async_batch_scan(scanner, timeout_ms=1000):

Review Comment:
   The two `__aiter__` branches are identical except for the poll method name. 
You can collapse to a single `PyOnceLock` + generator that takes a callable, 
and dispatch by passing `_async_poll` or `_async_poll_batches`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-14 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2936003486


##
bindings/python/test/test_log_table.py:
##
@@ -729,6 +729,923 @@ async def 
test_scan_records_indexing_and_slicing(connection, admin):
 await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_async_iterator(connection, admin):

Review Comment:
   Hi @fresh-borzoni, I've made the changes here: 
efbcb8cd3e390558fc6242f7a7aed184b5c2257b please let me know if I removed 
correctly all the tests requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-14 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2935983113


##
bindings/python/src/table.rs:
##
@@ -2199,6 +2200,171 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {

Review Comment:
   Hi @fresh-borzoni, just added `__aiter__` to `__init__.pyi` here 
134e56b543395d7f110ffa7b36edfe6fdab4e729 along with with `_async_poll` and 
`_async_poll_batches`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-14 Thread via GitHub


qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2935947256


##
bindings/python/src/table.rs:
##
@@ -30,6 +30,7 @@ use pyo3::types::{
 PyDeltaAccess, PyDict, PyList, PySequence, PySlice, PyTime, PyTimeAccess, 
PyTuple, PyType,
 PyTzInfo,
 };
+use pyo3::{Bound, IntoPyObjectExt, Py, PyAny, PyRef, PyRefMut, PyResult, 
Python};

Review Comment:
   Hi @fresh-borzoni, I see that you're correct as the others are imported 
already via `use crate::*;` from `lib.rs`. I removed the extra imports in 
d619b13d12d51b1f36bbe895c8ca2422683ca43e and kept only `IntoPyObjectExt`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-14 Thread via GitHub


fresh-borzoni commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2935486556


##
bindings/python/test/test_log_table.py:
##
@@ -729,6 +729,923 @@ async def 
test_scan_records_indexing_and_slicing(connection, admin):
 await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_async_iterator(connection, admin):

Review Comment:
   Can we reduce the amount of tests?
   
 - Keep tests that cover the async for happy path, break safety, and 
multiple poll cycles

   
 - Drop tests for _async_poll / _async_poll_batches directly  - they're 
internal methods, tested implicitly through async for
 - Drop tests that re-verify existing features (projection, pandas, 
metadata) through the async path - those features are already covered by sync 
tests   



##
bindings/python/src/table.rs:
##
@@ -30,6 +30,7 @@ use pyo3::types::{
 PyDeltaAccess, PyDict, PyList, PySequence, PySlice, PyTime, PyTimeAccess, 
PyTuple, PyType,
 PyTzInfo,
 };
+use pyo3::{Bound, IntoPyObjectExt, Py, PyAny, PyRef, PyRefMut, PyResult, 
Python};

Review Comment:
   Do we really need these? 
   Seems that it's a leftover and we only need 1-2 objects, maybe only 
`IntoPyObjectExt`



##
bindings/python/src/table.rs:
##
@@ -2199,6 +2200,171 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {

Review Comment:
   we need to add this method to .pyi stubs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-12 Thread via GitHub


qzyu999 commented on PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#issuecomment-4052901617

   Hi @fresh-borzoni, just realized that the original issue #424 is also 
looking for `create_record_batch_log_scanner` to be included in the 
functionality. Just added code for it, was able to compile and test locally, 
PTAL when available, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-11 Thread via GitHub


qzyu999 commented on PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#issuecomment-4042943163

   Hi @fresh-borzoni, thanks for the recommendations. I've taken a look and 
came up with the following changes:
   
   1. Replace `Arc>` with `Arc`
   2. Remove `ScannerState` struct and `VecDeque` buffer
   3. Remove all 6 `unsafe` pointer casts
   4. Replace `__anext__` with `_async_poll(timeout_ms)` (single bounded poll)
   5. Replace `__aiter__` with `PyOnceLock`-cached Python async generator
   6. Change batch scanner error from `StopAsyncIteration` to `TypeError`
   7. Update `with_scanner!` macro or inline to use `&self.kind` directly
   8. Add break-safety and batch-scanner-error tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-11 Thread via GitHub


fresh-borzoni commented on PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#issuecomment-4039011485

   @qzyu999 Ty, took a look at the approach, have some idead. PTAL
   
   The scanner is already thread-safe internally (&self on all methods), so the 
Mutex isn't needed, it just adds locking to every call and forces 5 unsafe 
pointer casts to work around borrow issues it created. The __anext__ loop is 
also problematic: it runs inside tokio::spawn, so breaking out of async for 
leaves it polling forever in the background.




   Simpler idea: store the scanner in an Arc, keep existing methods as-is. Add 
_async_poll(timeout_ms) that does one bounded poll and returns a list. 
__aiter__ returns a small Python async generator that calls _async_poll and 
yields records. Break stops the generator naturally, so no leaks, no unsafe, no 
mutex. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-10 Thread via GitHub


Copilot commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2915483929


##
bindings/python/src/table.rs:
##
@@ -2030,7 +2041,10 @@ impl LogScanner {
 /// - Returns an empty ScanRecords if no records are available
 /// - When timeout expires, returns an empty ScanRecords (NOT an error)
 fn poll(&self, py: Python, timeout_ms: i64) -> PyResult {
-let scanner = self.scanner.as_record()?;
+let scanner_ref =
+unsafe { &*(&self.state as *const 
std::sync::Arc>) };
+let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });

Review Comment:
   Avoid the `unsafe` pointer cast when accessing `self.state`. You can lock 
the mutex directly via `self.state.lock()` (or clone the `Arc` first) without 
`unsafe`; the current cast is unnecessary and introduces unsoundness risk if 
the field type ever changes.
   ```suggestion
   let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });
   ```



##
bindings/python/src/table.rs:
##
@@ -2199,6 +2226,90 @@ impl LogScanner {
 Ok(df)
 }
 
+fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> {
+let py = slf.py();
+let code = pyo3::ffi::c_str!(
+r#"
+async def _adapter(obj):
+while True:
+try:
+yield await obj.__anext__()
+except StopAsyncIteration:
+break
+"#
+);
+let globals = pyo3::types::PyDict::new(py);
+py.run(code, Some(&globals), None)?;
+let adapter = globals.get_item("_adapter")?.unwrap();
+// Return adapt(self)
+adapter.call1((slf.into_bound_py_any(py)?,))
+}
+
+fn __anext__<'py>(slf: PyRefMut<'py, Self>) -> PyResult>> {
+let state_arc = slf.state.clone();
+let projected_row_type = slf.projected_row_type.clone();
+let py = slf.py();
+
+let future = future_into_py(py, async move {
+let mut state = state_arc.lock().await;
+
+// 1. If we already have buffered records, pop and return 
immediately
+if let Some(record) = state.pending_records.pop_front() {
+return Ok(record.into_any());
+}
+
+// 2. Buffer is empty, we must poll the network for the next batch
+// The underlying kind must be a Record-based scanner.
+let scanner = match state.kind.as_record() {
+Ok(s) => s,
+Err(_) => {
+return Err(pyo3::exceptions::PyStopAsyncIteration::new_err(
+"Stream Ended",

Review Comment:
   `__anext__` treats the batch-based scanner variant as end-of-stream 
(`StopAsyncIteration`). That will silently terminate `async for` on scanners 
created via `create_record_batch_log_scanner()`, and it also masks the helpful 
error message from `as_record()`. Either implement async iteration for the 
batch variant (yielding `RecordBatch`/Arrow), or raise a `TypeError` explaining 
that async iteration is only supported for record scanners.
   ```suggestion
   return Err(PyTypeError::new_err(
   "Async iteration is only supported for record 
scanners; \
use create_record_log_scanner() instead.",
   ```



##
bindings/python/test/test_log_table.py:
##
@@ -729,6 +729,55 @@ async def 
test_scan_records_indexing_and_slicing(connection, admin):
 await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_async_iterator(connection, admin):
+"""Test the Python asynchronous iterator loop (`async for`) on 
LogScanner."""
+table_path = fluss.TablePath("fluss", "py_test_async_iterator")
+await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+schema = fluss.Schema(
+pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+)
+await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+table = await connection.get_table(table_path)
+writer = table.new_append().create_writer()
+
+# Write 5 records
+writer.write_arrow_batch(
+pa.RecordBatch.from_arrays(
+[pa.array(list(range(1, 6)), type=pa.int32()),
+ pa.array([f"async{i}" for i in range(1, 6)])],
+schema=pa.schema([pa.field("id", pa.int32()), pa.field("val", 
pa.string())]),
+)
+)
+await writer.flush()
+
+scanner = await table.new_scan().create_log_scanner()
+num_buckets = (await admin.get_table_info(table_path)).num_buckets
+scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+collected = []
+
+# Here is the magical Issue #424 async iterator logic at work:
+async def consume_scanner():
+async for record in scanner:
+collected.append(record)
+if len(collected) == 5:
+break
+
+ 

Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-09 Thread via GitHub


qzyu999 commented on PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#issuecomment-4028213851

   > @qzyu999 Ty for the PR, but I checked this branch out and integration 
tests for python still hang even when I run them locally. PTAL
   
   Hi @fresh-borzoni, applied the fix. Ran `cargo fmt --all` locally and it 
passed. Please run the CI again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add async 'for' loop support to LogScanner (#424) [fluss-rust]

2026-03-09 Thread via GitHub


fresh-borzoni commented on PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#issuecomment-4027986902

   @qzyu999 Ty for the PR, but I checked this branch out and integration tests 
for python still hang. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]