HeartSaVioR commented on code in PR #53085:
URL: https://github.com/apache/spark/pull/53085#discussion_r2575599231


##########
python/docs/source/tutorial/sql/python_data_source.rst:
##########
@@ -487,6 +487,101 @@ We can also use the same data source in streaming reader 
and writer
 
     query = 
spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")
 
+**Admission Control for Streaming Sources**
+
+Spark supports admission control for streaming sources to limit the amount of 
data processed in each micro-batch. This helps control resource usage and 
maintain consistent processing times. Python streaming data sources support 
three types of admission control options:
+
+- **maxRecordsPerBatch**: Limit the maximum number of rows per batch
+- **maxFilesPerBatch**: Limit the maximum number of files per batch
+- **maxBytesPerBatch**: Limit the maximum bytes per batch (in bytes)
+
+These options can be specified when reading from a streaming source:
+
+.. code-block:: python
+
+    # Limit to 1000 rows per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxRecordsPerBatch", "1000") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 100 files per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxFilesPerBatch", "100") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 10 MB per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxBytesPerBatch", str(10 * 1024 * 1024)) \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+**Note**: 
+
+- Only one admission control option should be specified at a time
+- All admission control values must be positive integers. If an invalid value 
is provided (negative, zero, or non-numeric), an ``IllegalArgumentException`` 
will be thrown
+- These options apply to both ``streamReader()`` and ``simpleStreamReader()`` 
implementations
+- This behavior is consistent with Spark's built-in streaming sources (e.g., 
``maxFilesPerTrigger``, ``maxBytesPerTrigger`` for file sources)
+
+**Implementing Admission Control in Custom Data Sources (Advanced)**
+
+For users implementing the full ``DataSourceStreamReader`` API (not 
``SimpleDataSourceStreamReader``), admission control is enabled through 
optional parameters on the ``latestOffset()`` method. As of Spark 4.2, the 
``latestOffset()`` method signature has been enhanced to accept optional 
parameters:
+
+.. code-block:: python
+
+    from pyspark.sql.datasource import DataSourceStreamReader
+    from typing import Optional, Union, Tuple
+    
+    class MyStreamReader(DataSourceStreamReader):
+        def latestOffset(
+            self, 
+            start: Optional[dict] = None, 
+            limit: Optional[dict] = None
+        ) -> Union[dict, Tuple[dict, dict]]:
+            # For backward compatibility: old implementations without 
parameters still work
+            if start is None and limit is None:
+                # Old behavior: return latest offset without admission control
+                return {"offset": self.get_latest_offset()}
+            
+            # New behavior: with admission control support
+            true_latest = self.get_latest_offset()
+            
+            # Apply admission control if configured
+            if limit and limit.get("type") == "maxRows":
+                max_rows = limit["maxRows"]
+                capped_offset = self.calculate_capped_offset(start, max_rows)
+                # Return tuple: (capped_offset, true_latest_offset)

Review Comment:
   Is it a combination of `latestOffset(startOffset, limit)` + 
`reportLatestOffset()` in SupportsAdmissionControl? I'll have a further read to 
see where the true_latest would be used, but this is a divergence with DSv2 
Scala API, and also old interface (different return type).



##########
python/docs/source/tutorial/sql/python_data_source.rst:
##########
@@ -487,6 +487,101 @@ We can also use the same data source in streaming reader 
and writer
 
     query = 
spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")
 
+**Admission Control for Streaming Sources**
+
+Spark supports admission control for streaming sources to limit the amount of 
data processed in each micro-batch. This helps control resource usage and 
maintain consistent processing times. Python streaming data sources support 
three types of admission control options:
+
+- **maxRecordsPerBatch**: Limit the maximum number of rows per batch
+- **maxFilesPerBatch**: Limit the maximum number of files per batch
+- **maxBytesPerBatch**: Limit the maximum bytes per batch (in bytes)
+
+These options can be specified when reading from a streaming source:
+
+.. code-block:: python
+
+    # Limit to 1000 rows per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxRecordsPerBatch", "1000") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 100 files per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxFilesPerBatch", "100") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 10 MB per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxBytesPerBatch", str(10 * 1024 * 1024)) \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+**Note**: 
+
+- Only one admission control option should be specified at a time
+- All admission control values must be positive integers. If an invalid value 
is provided (negative, zero, or non-numeric), an ``IllegalArgumentException`` 
will be thrown
+- These options apply to both ``streamReader()`` and ``simpleStreamReader()`` 
implementations
+- This behavior is consistent with Spark's built-in streaming sources (e.g., 
``maxFilesPerTrigger``, ``maxBytesPerTrigger`` for file sources)
+
+**Implementing Admission Control in Custom Data Sources (Advanced)**
+
+For users implementing the full ``DataSourceStreamReader`` API (not 
``SimpleDataSourceStreamReader``), admission control is enabled through 
optional parameters on the ``latestOffset()`` method. As of Spark 4.2, the 
``latestOffset()`` method signature has been enhanced to accept optional 
parameters:
+
+.. code-block:: python
+
+    from pyspark.sql.datasource import DataSourceStreamReader
+    from typing import Optional, Union, Tuple
+    
+    class MyStreamReader(DataSourceStreamReader):
+        def latestOffset(
+            self, 
+            start: Optional[dict] = None, 
+            limit: Optional[dict] = None
+        ) -> Union[dict, Tuple[dict, dict]]:
+            # For backward compatibility: old implementations without 
parameters still work

Review Comment:
   It's a bit confusing - this is an implementation of data source with recent 
interface. We don't deal with old implementation.
   
   Do you meant to use data source with newer interface with older Spark 
version (forward compatibility)?



##########
python/docs/source/tutorial/sql/python_data_source.rst:
##########
@@ -487,6 +487,101 @@ We can also use the same data source in streaming reader 
and writer
 
     query = 
spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")
 
+**Admission Control for Streaming Sources**
+
+Spark supports admission control for streaming sources to limit the amount of 
data processed in each micro-batch. This helps control resource usage and 
maintain consistent processing times. Python streaming data sources support 
three types of admission control options:
+
+- **maxRecordsPerBatch**: Limit the maximum number of rows per batch
+- **maxFilesPerBatch**: Limit the maximum number of files per batch
+- **maxBytesPerBatch**: Limit the maximum bytes per batch (in bytes)
+
+These options can be specified when reading from a streaming source:
+
+.. code-block:: python
+
+    # Limit to 1000 rows per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxRecordsPerBatch", "1000") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 100 files per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxFilesPerBatch", "100") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 10 MB per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxBytesPerBatch", str(10 * 1024 * 1024)) \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+**Note**: 
+
+- Only one admission control option should be specified at a time
+- All admission control values must be positive integers. If an invalid value 
is provided (negative, zero, or non-numeric), an ``IllegalArgumentException`` 
will be thrown
+- These options apply to both ``streamReader()`` and ``simpleStreamReader()`` 
implementations
+- This behavior is consistent with Spark's built-in streaming sources (e.g., 
``maxFilesPerTrigger``, ``maxBytesPerTrigger`` for file sources)
+
+**Implementing Admission Control in Custom Data Sources (Advanced)**

Review Comment:
   Should we move this before "Implement a Streaming Writer", with having some 
notion of "optional"? Maybe you would want to move some note in the above into 
implementation, since users who will implement the data source need to 
understand that limitation.
   
   To remind, the doc is basically for users who are going to "implement" their 
own data source.



##########
python/docs/source/tutorial/sql/python_data_source.rst:
##########
@@ -487,6 +487,101 @@ We can also use the same data source in streaming reader 
and writer
 
     query = 
spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")
 
+**Admission Control for Streaming Sources**
+
+Spark supports admission control for streaming sources to limit the amount of 
data processed in each micro-batch. This helps control resource usage and 
maintain consistent processing times. Python streaming data sources support 
three types of admission control options:
+
+- **maxRecordsPerBatch**: Limit the maximum number of rows per batch
+- **maxFilesPerBatch**: Limit the maximum number of files per batch
+- **maxBytesPerBatch**: Limit the maximum bytes per batch (in bytes)
+
+These options can be specified when reading from a streaming source:
+
+.. code-block:: python
+
+    # Limit to 1000 rows per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxRecordsPerBatch", "1000") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 100 files per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxFilesPerBatch", "100") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 10 MB per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxBytesPerBatch", str(10 * 1024 * 1024)) \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+**Note**: 
+
+- Only one admission control option should be specified at a time
+- All admission control values must be positive integers. If an invalid value 
is provided (negative, zero, or non-numeric), an ``IllegalArgumentException`` 
will be thrown
+- These options apply to both ``streamReader()`` and ``simpleStreamReader()`` 
implementations
+- This behavior is consistent with Spark's built-in streaming sources (e.g., 
``maxFilesPerTrigger``, ``maxBytesPerTrigger`` for file sources)
+
+**Implementing Admission Control in Custom Data Sources (Advanced)**
+
+For users implementing the full ``DataSourceStreamReader`` API (not 
``SimpleDataSourceStreamReader``), admission control is enabled through 
optional parameters on the ``latestOffset()`` method. As of Spark 4.2, the 
``latestOffset()`` method signature has been enhanced to accept optional 
parameters:
+
+.. code-block:: python
+
+    from pyspark.sql.datasource import DataSourceStreamReader
+    from typing import Optional, Union, Tuple
+    
+    class MyStreamReader(DataSourceStreamReader):
+        def latestOffset(
+            self, 
+            start: Optional[dict] = None, 
+            limit: Optional[dict] = None
+        ) -> Union[dict, Tuple[dict, dict]]:
+            # For backward compatibility: old implementations without 
parameters still work
+            if start is None and limit is None:
+                # Old behavior: return latest offset without admission control
+                return {"offset": self.get_latest_offset()}
+            
+            # New behavior: with admission control support
+            true_latest = self.get_latest_offset()
+            
+            # Apply admission control if configured
+            if limit and limit.get("type") == "maxRows":

Review Comment:
   Maybe good to hear some opinion on this.
   
   @allisonwang-db 
   We are porting some param class hierarchy to python data source.
   
   ReadLimit
   +- ReadMaxRows
   +- ReadMaxFiles
   +- ReadMaxBytes
   +- ReadAllAvailable
   
   The interface ReadLimit does not actually mean implementations share the 
method to read the value. That's mostly a marker and caller figures out the 
actual type and handles it accordingly.
   
   Here, @jiteshsoni simply picks up dict with type names and corresponding 
param name per type. Would it be python friendly port, or we have better way of 
porting leveraging typing in Python?
   
   Thanks in advance!



##########
python/pyspark/sql/datasource.py:
##########
@@ -714,20 +714,46 @@ def initialOffset(self) -> dict:
             messageParameters={"feature": "initialOffset"},
         )
 
-    def latestOffset(self) -> dict:
+    def latestOffset(

Review Comment:
   Maybe good to clarify the case first. Let's assume user decided not to 
implement the admission control in the data source implementation. What is the 
preferred way to do it? Throwing exception if the type of limit is not 
"allAvailable"?
   
   Also based on the explanation of parameter, it doesn't seem like we are 
providing the same with Scala API.
   
   ```
     /**
      * Returns the most recent offset available given a read limit. The start 
offset can be used
      * to figure out how much new data should be read given the limit. Users 
should implement this
      * method instead of latestOffset for a MicroBatchStream or getOffset for 
Source.
      * <p>
      * When this method is called on a `Source`, the source can return `null` 
if there is no
      * data to process. In addition, for the very first micro-batch, the 
`startOffset` will be
      * null as well.
      * <p>
      * When this method is called on a MicroBatchStream, the `startOffset` 
will be `initialOffset`
      * for the very first micro-batch. The source can return `null` if there 
is no data to process.
      */
     Offset latestOffset(Offset startOffset, ReadLimit limit);
   ```
   
   Python Data Source API is technically bound to DSv2 (MicroBatchStream), and 
we will need to provide some value to `start` all the time.



##########
python/docs/source/tutorial/sql/python_data_source.rst:
##########
@@ -487,6 +487,101 @@ We can also use the same data source in streaming reader 
and writer
 
     query = 
spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")
 
+**Admission Control for Streaming Sources**
+
+Spark supports admission control for streaming sources to limit the amount of 
data processed in each micro-batch. This helps control resource usage and 
maintain consistent processing times. Python streaming data sources support 
three types of admission control options:
+
+- **maxRecordsPerBatch**: Limit the maximum number of rows per batch
+- **maxFilesPerBatch**: Limit the maximum number of files per batch
+- **maxBytesPerBatch**: Limit the maximum bytes per batch (in bytes)
+
+These options can be specified when reading from a streaming source:
+
+.. code-block:: python
+
+    # Limit to 1000 rows per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxRecordsPerBatch", "1000") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 100 files per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxFilesPerBatch", "100") \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+    # Limit to 10 MB per batch
+    query = spark.readStream \
+        .format("fake") \
+        .option("maxBytesPerBatch", str(10 * 1024 * 1024)) \
+        .load() \
+        .writeStream \
+        .format("console") \
+        .start()
+
+**Note**: 
+
+- Only one admission control option should be specified at a time
+- All admission control values must be positive integers. If an invalid value 
is provided (negative, zero, or non-numeric), an ``IllegalArgumentException`` 
will be thrown
+- These options apply to both ``streamReader()`` and ``simpleStreamReader()`` 
implementations
+- This behavior is consistent with Spark's built-in streaming sources (e.g., 
``maxFilesPerTrigger``, ``maxBytesPerTrigger`` for file sources)
+
+**Implementing Admission Control in Custom Data Sources (Advanced)**
+
+For users implementing the full ``DataSourceStreamReader`` API (not 
``SimpleDataSourceStreamReader``), admission control is enabled through 
optional parameters on the ``latestOffset()`` method. As of Spark 4.2, the 
``latestOffset()`` method signature has been enhanced to accept optional 
parameters:
+
+.. code-block:: python
+
+    from pyspark.sql.datasource import DataSourceStreamReader
+    from typing import Optional, Union, Tuple
+    
+    class MyStreamReader(DataSourceStreamReader):
+        def latestOffset(
+            self, 
+            start: Optional[dict] = None, 
+            limit: Optional[dict] = None
+        ) -> Union[dict, Tuple[dict, dict]]:
+            # For backward compatibility: old implementations without 
parameters still work
+            if start is None and limit is None:
+                # Old behavior: return latest offset without admission control
+                return {"offset": self.get_latest_offset()}
+            
+            # New behavior: with admission control support
+            true_latest = self.get_latest_offset()
+            
+            # Apply admission control if configured
+            if limit and limit.get("type") == "maxRows":
+                max_rows = limit["maxRows"]
+                capped_offset = self.calculate_capped_offset(start, max_rows)
+                # Return tuple: (capped_offset, true_latest_offset)
+                return (capped_offset, true_latest)
+            elif limit and limit.get("type") == "maxFiles":
+                max_files = limit["maxFiles"]
+                capped_offset = self.calculate_capped_offset_by_files(start, 
max_files)
+                return (capped_offset, true_latest)
+            else:
+                # No limit or allAvailable
+                return (true_latest, true_latest)
+
+**Key Points:**
+
+- **Backward Compatibility**: Old implementations that don't accept parameters 
continue to work without modification
+- **Optional Parameters**: Both ``start`` and ``limit`` are optional; if not 
provided, implement old behavior
+- **Return Type**: Return a single ``dict`` for old behavior, or a 
``Tuple[dict, dict]`` for new behavior with admission control
+- **Limit Structure**: The ``limit`` parameter is a dictionary with:
+  - ``{"type": "maxRows", "maxRows": N}`` for row-based limits
+  - ``{"type": "maxFiles", "maxFiles": N}`` for file-based limits  
+  - ``{"type": "maxBytes", "maxBytes": N}`` for byte-based limits
+  - ``{"type": "allAvailable"}`` for no limit
+- **SimpleDataSourceStreamReader**: Users of the simple API don't need to 
implement this; the framework handles admission control automatically

Review Comment:
   I'm not following this sentence. Do you mean users can still provide the 
above option and the implementation of SimpleDataSourceStreamReader can handle 
it without requiring the data source itself to do it? How would it be possible? 
e.g. The data source reads from Kafka and the option is max files.



##########
python/pyspark/sql/datasource.py:
##########
@@ -714,20 +714,46 @@ def initialOffset(self) -> dict:
             messageParameters={"feature": "initialOffset"},
         )
 
-    def latestOffset(self) -> dict:
+    def latestOffset(
+        self, start: Optional[dict] = None, limit: Optional[dict] = None
+    ) -> Union[dict, Tuple[dict, dict]]:
         """
         Returns the most recent offset available.
 
+        Parameters (optional - added in Spark 4.2 for admission control)
+        -------------------------------------------------------------------
+        start : dict, optional
+            The starting offset. Enables admission control when provided.
+        limit : dict, optional
+            Admission control limit with structure:
+            - {"type": "maxRows", "maxRows": N}
+            - {"type": "maxFiles", "maxFiles": N}
+            - {"type": "maxBytes", "maxBytes": N}
+            - {"type": "allAvailable"}
+
         Returns
         -------
-        dict
-            A dict or recursive dict whose key and value are primitive types, 
which includes
-            Integer, String and Boolean.
+        dict or Tuple[dict, dict]
+            - Old behavior (no params): returns single offset dict
+            - New behavior (with params): returns (capped_offset, 
true_latest_offset)
 
         Examples
         --------
+        Old implementation (backward compatible):
+
         >>> def latestOffset(self):
-        ...     return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+        ...     return {"partition-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+
+        New implementation (with admission control):
+
+        >>> def latestOffset(self, start=None, limit=None):

Review Comment:
   Now I wonder whether we should just rename the function a bit to give the 
clarity of which function to be implemented to the dev of data source. 
   e.g. latestOffsetWithAdmissionControl (It's quite long and I love it if we 
could reduce it without hurting the semantic, but it seems to be clear on its 
intention.)
   
   With the separated function, it's very clear for dev to implement "one of 
the two" functions. If they use the implementation from old Spark version and 
they don't change the implementation, they just go with latestOffset without 
parameter. Otherwise they go with latestOffsetWithAdmissionControl (name TBD) 
with "non-optional" start and limit parameters.
   
   We can even go with mix-in but interface with single function would be an 
overkill and it'd probably be better to keep both functions in the same 
interface. If we want to couple reportLatestOffset() with it, probably good to 
extract it out to have another interface, like SupportsAdmissionControl.
   
   I really wonder the implementation of python data source has to work 
properly with older Spark version, when it uses the new interface which only 
exists in new Spark version. It sounds to me an unnecessary complication.
   
   cc. @allisonwang-db Love to hear your voice here.



##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########


Review Comment:
   I see you've changed unrelated lines. Did linter complain? If then I wonder 
how it went through the codebase. If it's not caught by linter and your own 
reasoning of refactor, let's revert the change unless you have a strong voice 
demanding the change.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -49,37 +73,120 @@ class PythonMicroBatchStream(
   // from python to JVM.
   private var cachedInputPartition: Option[(String, String, 
PythonStreamingInputPartition)] = None
 
+  // Store the latest available offset for reporting
+  private var latestAvailableOffset: Option[PythonStreamingSourceOffset] = None
+
   private val runner: PythonStreamingSourceRunner =
     new PythonStreamingSourceRunner(createDataSourceFunc, outputSchema)
   runner.init()
 
   override def initialOffset(): Offset = 
PythonStreamingSourceOffset(runner.initialOffset())
 
-  override def latestOffset(): Offset = 
PythonStreamingSourceOffset(runner.latestOffset())
+  /**
+   * Returns the default read limit based on configured options. Supports: 
maxRecordsPerBatch,
+   * maxFilesPerBatch, maxBytesPerBatch. Falls back to allAvailable if no 
valid limit is
+   * configured.
+   *
+   * @since 4.2.0
+   */
+  override def getDefaultReadLimit: ReadLimit = {
+    import scala.util.Try
+
+    def parseLong(key: String): Long = {
+      Try(options.get(key).toLong).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    def parseInt(key: String): Int = {
+      Try(options.get(key).toInt).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    if (options.containsKey("maxRecordsPerBatch")) {
+      val records = parseLong("maxRecordsPerBatch")
+      logInfo(s"Admission control: maxRecordsPerBatch = $records")
+      ReadLimit.maxRows(records)
+    } else if (options.containsKey("maxFilesPerBatch")) {
+      val files = parseInt("maxFilesPerBatch")
+      logInfo(s"Admission control: maxFilesPerBatch = $files")
+      ReadLimit.maxFiles(files)
+    } else if (options.containsKey("maxBytesPerBatch")) {
+      val bytes = parseLong("maxBytesPerBatch")
+      logInfo(s"Admission control: maxBytesPerBatch = $bytes")
+      ReadLimit.maxBytes(bytes)
+    } else {
+      logDebug("No admission control limit configured, using allAvailable")
+      ReadLimit.allAvailable()
+    }
+  }
+
+  override def latestOffset(): Offset = {

Review Comment:
   If the data source (DSv2 Scala/Java) implements SupportsAdmissionControl, 
this method is expected to be not called. The engine checks the interface and 
calls the method accordingly. The below code actually does nothing and throwing 
an exception would be the same.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########


Review Comment:
   Same here, I see some changes are unrelated and purely some refactor/style. 
We do not encourage such a change unless it's outstandingly better than 
existing one. Many cases we split out such a change to a separate PR e.g. MINOR 
PR.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -49,37 +73,120 @@ class PythonMicroBatchStream(
   // from python to JVM.
   private var cachedInputPartition: Option[(String, String, 
PythonStreamingInputPartition)] = None
 
+  // Store the latest available offset for reporting
+  private var latestAvailableOffset: Option[PythonStreamingSourceOffset] = None
+
   private val runner: PythonStreamingSourceRunner =
     new PythonStreamingSourceRunner(createDataSourceFunc, outputSchema)
   runner.init()
 
   override def initialOffset(): Offset = 
PythonStreamingSourceOffset(runner.initialOffset())
 
-  override def latestOffset(): Offset = 
PythonStreamingSourceOffset(runner.latestOffset())
+  /**
+   * Returns the default read limit based on configured options. Supports: 
maxRecordsPerBatch,
+   * maxFilesPerBatch, maxBytesPerBatch. Falls back to allAvailable if no 
valid limit is
+   * configured.
+   *
+   * @since 4.2.0
+   */
+  override def getDefaultReadLimit: ReadLimit = {
+    import scala.util.Try
+
+    def parseLong(key: String): Long = {
+      Try(options.get(key).toLong).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    def parseInt(key: String): Int = {
+      Try(options.get(key).toInt).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    if (options.containsKey("maxRecordsPerBatch")) {
+      val records = parseLong("maxRecordsPerBatch")
+      logInfo(s"Admission control: maxRecordsPerBatch = $records")
+      ReadLimit.maxRows(records)
+    } else if (options.containsKey("maxFilesPerBatch")) {
+      val files = parseInt("maxFilesPerBatch")
+      logInfo(s"Admission control: maxFilesPerBatch = $files")
+      ReadLimit.maxFiles(files)
+    } else if (options.containsKey("maxBytesPerBatch")) {
+      val bytes = parseLong("maxBytesPerBatch")
+      logInfo(s"Admission control: maxBytesPerBatch = $bytes")
+      ReadLimit.maxBytes(bytes)
+    } else {
+      logDebug("No admission control limit configured, using allAvailable")
+      ReadLimit.allAvailable()
+    }
+  }
+
+  override def latestOffset(): Offset = {
+    // Bridge to new signature with default read limit for backward 
compatibility
+    // Pass null as start offset to maintain backward compatibility with old 
behavior
+    latestOffset(null, getDefaultReadLimit)
+  }
+
+  /**
+   * Returns the latest offset with admission control limit applied. Also 
updates the true latest
+   * offset for reporting purposes.
+   *
+   * @param startOffset
+   *   the starting offset, may be null
+   * @param limit
+   *   the read limit to apply
+   * @return
+   *   the capped offset respecting the limit
+   * @since 4.2.0
+   */
+  override def latestOffset(startOffset: Offset, limit: ReadLimit): Offset = {
+    val startJson = Option(startOffset).map(_.json()).orNull
+    val (cappedOffsetJson, trueLatestJson) = 
runner.latestOffsetWithReport(startJson, limit)
+    val cappedOffset = PythonStreamingSourceOffset(cappedOffsetJson)
+    latestAvailableOffset = Some(PythonStreamingSourceOffset(trueLatestJson))
+    cappedOffset
+  }
+
+  /**

Review Comment:
   nit: Let's not add documentation unless there is noticeable difference from 
the method doc of base method.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -28,15 +29,38 @@ import org.apache.spark.storage.{PythonStreamBlockId, 
StorageLevel}
 
 case class PythonStreamingSourceOffset(json: String) extends Offset
 
+/**
+ * Micro-batch stream implementation for Python data sources with admission 
control support.
+ *
+ * This class bridges JVM Spark streaming with Python-based data sources, 
supporting:
+ *   - Admission control via ReadLimit (maxRecordsPerBatch, maxFilesPerBatch, 
maxBytesPerBatch)
+ *   - Offset tracking and management
+ *   - Latest seen offset for prefetching optimization
+ *
+ * Admission control options:
+ *   - `maxRecordsPerBatch`: Maximum number of rows per batch (Long, must be > 
0)
+ *   - `maxFilesPerBatch`: Maximum number of files per batch (Int, must be > 0)
+ *   - `maxBytesPerBatch`: Maximum bytes per batch (Long, must be > 0)
+ *
+ * @param ds

Review Comment:
   nit: keep them in single line unless it exceeds 100 chars



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -99,8 +103,8 @@ class PythonStreamingSourceRunner(
     pythonWorker = Some(worker)
     pythonWorkerFactory = Some(workerFactory)
 
-    val stream = new BufferedOutputStream(

Review Comment:
   nit: Same here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -49,37 +73,120 @@ class PythonMicroBatchStream(
   // from python to JVM.
   private var cachedInputPartition: Option[(String, String, 
PythonStreamingInputPartition)] = None
 
+  // Store the latest available offset for reporting
+  private var latestAvailableOffset: Option[PythonStreamingSourceOffset] = None
+
   private val runner: PythonStreamingSourceRunner =
     new PythonStreamingSourceRunner(createDataSourceFunc, outputSchema)
   runner.init()
 
   override def initialOffset(): Offset = 
PythonStreamingSourceOffset(runner.initialOffset())
 
-  override def latestOffset(): Offset = 
PythonStreamingSourceOffset(runner.latestOffset())
+  /**
+   * Returns the default read limit based on configured options. Supports: 
maxRecordsPerBatch,
+   * maxFilesPerBatch, maxBytesPerBatch. Falls back to allAvailable if no 
valid limit is
+   * configured.
+   *
+   * @since 4.2.0
+   */
+  override def getDefaultReadLimit: ReadLimit = {
+    import scala.util.Try
+
+    def parseLong(key: String): Long = {
+      Try(options.get(key).toLong).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    def parseInt(key: String): Int = {
+      Try(options.get(key).toInt).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    if (options.containsKey("maxRecordsPerBatch")) {
+      val records = parseLong("maxRecordsPerBatch")
+      logInfo(s"Admission control: maxRecordsPerBatch = $records")
+      ReadLimit.maxRows(records)
+    } else if (options.containsKey("maxFilesPerBatch")) {
+      val files = parseInt("maxFilesPerBatch")
+      logInfo(s"Admission control: maxFilesPerBatch = $files")
+      ReadLimit.maxFiles(files)
+    } else if (options.containsKey("maxBytesPerBatch")) {
+      val bytes = parseLong("maxBytesPerBatch")
+      logInfo(s"Admission control: maxBytesPerBatch = $bytes")
+      ReadLimit.maxBytes(bytes)
+    } else {
+      logDebug("No admission control limit configured, using allAvailable")
+      ReadLimit.allAvailable()
+    }
+  }
+
+  override def latestOffset(): Offset = {
+    // Bridge to new signature with default read limit for backward 
compatibility
+    // Pass null as start offset to maintain backward compatibility with old 
behavior
+    latestOffset(null, getDefaultReadLimit)
+  }
+
+  /**
+   * Returns the latest offset with admission control limit applied. Also 
updates the true latest
+   * offset for reporting purposes.
+   *
+   * @param startOffset
+   *   the starting offset, may be null

Review Comment:
   nit: let's put in the same line if it doesn't exceed 100 chars



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -154,16 +145,18 @@ class PythonStreamingSourceRunner(
     if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
       val msg = PythonWorkerUtils.readUTF(dataIn)
       throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "initialOffset", msg)

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -207,16 +317,18 @@ class PythonStreamingSourceRunner(
     if (status == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
       val msg = PythonWorkerUtils.readUTF(dataIn)
       throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "commitSource", msg)
+        action = "commitSource",
+        msg)
     }
   }
 
   /**
    * Stop the python worker process and invoke stop() on stream reader.
    */
   def stop(): Unit = {
-    logInfo(log"Stopping streaming runner for module: " +
-      log"${MDC(LogKeys.MODULE_NAME, workerModule)}.")
+    logInfo(

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -207,16 +317,18 @@ class PythonStreamingSourceRunner(
     if (status == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
       val msg = PythonWorkerUtils.readUTF(dataIn)
       throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "commitSource", msg)
+        action = "commitSource",

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -28,15 +29,38 @@ import org.apache.spark.storage.{PythonStreamBlockId, 
StorageLevel}
 
 case class PythonStreamingSourceOffset(json: String) extends Offset
 
+/**
+ * Micro-batch stream implementation for Python data sources with admission 
control support.
+ *
+ * This class bridges JVM Spark streaming with Python-based data sources, 
supporting:
+ *   - Admission control via ReadLimit (maxRecordsPerBatch, maxFilesPerBatch, 
maxBytesPerBatch)
+ *   - Offset tracking and management
+ *   - Latest seen offset for prefetching optimization
+ *
+ * Admission control options:
+ *   - `maxRecordsPerBatch`: Maximum number of rows per batch (Long, must be > 
0)
+ *   - `maxFilesPerBatch`: Maximum number of files per batch (Int, must be > 0)
+ *   - `maxBytesPerBatch`: Maximum bytes per batch (Long, must be > 0)
+ *
+ * @param ds
+ *   the Python data source V2 instance
+ * @param shortName
+ *   short name of the data source
+ * @param outputSchema
+ *   the output schema
+ * @param options
+ *   configuration options including admission control settings
+ * @since 4.2.0
+ */
 class PythonMicroBatchStream(
     ds: PythonDataSourceV2,
     shortName: String,
     outputSchema: StructType,
-    options: CaseInsensitiveStringMap
-  )
-  extends MicroBatchStream
-  with Logging
-  with AcceptsLatestSeenOffset {
+    options: CaseInsensitiveStringMap)
+    extends MicroBatchStream

Review Comment:
   nit: `extends` and `with` are indented with 2 spaces (different indentation 
from parameter). 
   
   Please refer to the below for more details:
   https://github.com/databricks/scala-style-guide



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -230,30 +342,36 @@ class PythonStreamingSourceRunner(
     }
   }
 
-  private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
-    s"stream reader for $pythonExec", 0, Long.MaxValue)
+  private val allocator =

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -230,30 +342,36 @@ class PythonStreamingSourceRunner(
     }
   }
 
-  private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
-    s"stream reader for $pythonExec", 0, Long.MaxValue)
+  private val allocator =
+    ArrowUtils.rootAllocator.newChildAllocator(s"stream reader for 
$pythonExec", 0, Long.MaxValue)
 
   def readArrowRecordBatches(): Iterator[InternalRow] = {
     val status = dataIn.readInt()
     status match {
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "prefetchArrowBatches", msg)
+          action = "prefetchArrowBatches",

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -230,30 +342,36 @@ class PythonStreamingSourceRunner(
     }
   }
 
-  private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
-    s"stream reader for $pythonExec", 0, Long.MaxValue)
+  private val allocator =
+    ArrowUtils.rootAllocator.newChildAllocator(s"stream reader for 
$pythonExec", 0, Long.MaxValue)
 
   def readArrowRecordBatches(): Iterator[InternalRow] = {
     val status = dataIn.readInt()
     status match {
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "prefetchArrowBatches", msg)
+          action = "prefetchArrowBatches",
+          msg)
       case SpecialLengths.START_ARROW_STREAM =>
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "prefetchArrowBatches", s"unknown status code $status")
+          action = "prefetchArrowBatches",

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")
+          action = "planPartitions",
+          s"unknown status code $prefetchedRecordsStatus")
     }
     (pickledPartitions.toArray, iter)
   }
 
+  /**
+   * Invokes latestOffset(startOffset, limit) function with admission control 
parameters.
+   */
+  def latestOffset(startOffset: String, limit: ReadLimit): String = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_LIMIT_FUNC_ID)
+    PythonWorkerUtils.writeUTF(startOffset, dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes reportLatestOffset() function to get the true latest available 
offset.
+   */
+  def reportLatestOffset(): String = {
+    dataOut.writeInt(REPORT_LATEST_OFFSET_FUNC_ID)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "reportLatestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes latestOffset with admission control and also fetches the true 
latest offset. This
+   * avoids race conditions by getting both offsets in a single RPC call.
+   *
+   * @param startOffset
+   *   the starting offset (may be null)
+   * @param limit
+   *   the read limit to apply
+   * @return
+   *   tuple of (capped offset with limit applied, true latest offset)
+   * @since 4.2.0
+   */
+  def latestOffsetWithReport(startOffset: String, limit: ReadLimit): (String, 
String) = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_REPORT_FUNC_ID)
+    // Handle null startOffset by writing empty string
+    PythonWorkerUtils.writeUTF(Option(startOffset).getOrElse(""), dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+
+    // Read capped offset
+    val cappedLen = dataIn.readInt()
+    if (cappedLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",

Review Comment:
   nit: maybe good to follow the existing style



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")
+          action = "planPartitions",
+          s"unknown status code $prefetchedRecordsStatus")
     }
     (pickledPartitions.toArray, iter)
   }
 
+  /**
+   * Invokes latestOffset(startOffset, limit) function with admission control 
parameters.
+   */
+  def latestOffset(startOffset: String, limit: ReadLimit): String = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_LIMIT_FUNC_ID)
+    PythonWorkerUtils.writeUTF(startOffset, dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",

Review Comment:
   nit: maybe good to follow the existing style



##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -88,25 +94,86 @@ def partitions_func(
         if it is None:
             write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
         else:
-            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)
+            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)  # noqa: E501
     else:
         write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
 
 
-def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:
+def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:  # noqa: E501
     end_offset = json.loads(utf8_deserializer.loads(infile))
     reader.commit(end_offset)
     write_int(0, outfile)
 
 
+def latest_offset_with_report_func(reader: DataSourceStreamReader, infile: IO, 
outfile: IO) -> None:
+    """
+    Handler for function ID 890: latestOffset with admission control
+    parameters.
+
+    This function supports both old and new reader implementations:
+    - Old readers: latestOffset() with no parameters -> no admission
+      control
+    - New readers: latestOffset(start, limit) with parameters ->
+      admission control enabled
+    """
+    start_offset_str = utf8_deserializer.loads(infile)
+    # Handle empty string as None for backward compatibility
+    start_offset = json.loads(start_offset_str) if start_offset_str else None
+    limit = json.loads(utf8_deserializer.loads(infile))
+
+    # Type declarations for mypy
+    capped_offset: dict
+    true_latest_offset: dict
+
+    try:
+        # Try calling with optional parameters (new signature)

Review Comment:
   I guess we may revisit as a whole depending on whether we isolate the new 
function or not.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")
+          action = "planPartitions",
+          s"unknown status code $prefetchedRecordsStatus")
     }
     (pickledPartitions.toArray, iter)
   }
 
+  /**
+   * Invokes latestOffset(startOffset, limit) function with admission control 
parameters.
+   */
+  def latestOffset(startOffset: String, limit: ReadLimit): String = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_LIMIT_FUNC_ID)
+    PythonWorkerUtils.writeUTF(startOffset, dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes reportLatestOffset() function to get the true latest available 
offset.
+   */
+  def reportLatestOffset(): String = {
+    dataOut.writeInt(REPORT_LATEST_OFFSET_FUNC_ID)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "reportLatestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes latestOffset with admission control and also fetches the true 
latest offset. This
+   * avoids race conditions by getting both offsets in a single RPC call.
+   *
+   * @param startOffset
+   *   the starting offset (may be null)
+   * @param limit
+   *   the read limit to apply
+   * @return
+   *   tuple of (capped offset with limit applied, true latest offset)
+   * @since 4.2.0
+   */
+  def latestOffsetWithReport(startOffset: String, limit: ReadLimit): (String, 
String) = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_REPORT_FUNC_ID)
+    // Handle null startOffset by writing empty string
+    PythonWorkerUtils.writeUTF(Option(startOffset).getOrElse(""), dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+
+    // Read capped offset
+    val cappedLen = dataIn.readInt()
+    if (cappedLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    val cappedOffset = PythonWorkerUtils.readUTF(cappedLen, dataIn)
+
+    // Read true latest offset
+    val trueLen = dataIn.readInt()
+    if (trueLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    val trueLatest = PythonWorkerUtils.readUTF(trueLen, dataIn)
+
+    (cappedOffset, trueLatest)
+  }
+
+  /**
+   * Serializes a ReadLimit to JSON format for Python. Uses json4s for safe 
JSON construction.
+   *
+   * @param limit

Review Comment:
   nit: one liner, please



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -230,30 +342,36 @@ class PythonStreamingSourceRunner(
     }
   }
 
-  private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
-    s"stream reader for $pythonExec", 0, Long.MaxValue)
+  private val allocator =
+    ArrowUtils.rootAllocator.newChildAllocator(s"stream reader for 
$pythonExec", 0, Long.MaxValue)
 
   def readArrowRecordBatches(): Iterator[InternalRow] = {
     val status = dataIn.readInt()
     status match {
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "prefetchArrowBatches", msg)
+          action = "prefetchArrowBatches",
+          msg)
       case SpecialLengths.START_ARROW_STREAM =>
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "prefetchArrowBatches", s"unknown status code $status")
+          action = "prefetchArrowBatches",
+          s"unknown status code $status")
     }
     val reader = new ArrowStreamReader(dataIn, allocator)
     val root = reader.getVectorSchemaRoot()
     // When input is empty schema can't be read.
     val schema = ArrowUtils.fromArrowSchema(root.getSchema())
     assert(schema == outputSchema)
 
-    val vectors = root.getFieldVectors().asScala.map { vector =>
-      new ArrowColumnVector(vector)
-    }.toArray[ColumnVector]
+    val vectors = root

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")
+          action = "planPartitions",
+          s"unknown status code $prefetchedRecordsStatus")
     }
     (pickledPartitions.toArray, iter)
   }
 
+  /**
+   * Invokes latestOffset(startOffset, limit) function with admission control 
parameters.
+   */
+  def latestOffset(startOffset: String, limit: ReadLimit): String = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_LIMIT_FUNC_ID)
+    PythonWorkerUtils.writeUTF(startOffset, dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes reportLatestOffset() function to get the true latest available 
offset.
+   */
+  def reportLatestOffset(): String = {
+    dataOut.writeInt(REPORT_LATEST_OFFSET_FUNC_ID)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "reportLatestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes latestOffset with admission control and also fetches the true 
latest offset. This
+   * avoids race conditions by getting both offsets in a single RPC call.
+   *
+   * @param startOffset
+   *   the starting offset (may be null)
+   * @param limit
+   *   the read limit to apply
+   * @return
+   *   tuple of (capped offset with limit applied, true latest offset)
+   * @since 4.2.0
+   */
+  def latestOffsetWithReport(startOffset: String, limit: ReadLimit): (String, 
String) = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_REPORT_FUNC_ID)
+    // Handle null startOffset by writing empty string
+    PythonWorkerUtils.writeUTF(Option(startOffset).getOrElse(""), dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+
+    // Read capped offset
+    val cappedLen = dataIn.readInt()
+    if (cappedLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    val cappedOffset = PythonWorkerUtils.readUTF(cappedLen, dataIn)
+
+    // Read true latest offset
+    val trueLen = dataIn.readInt()
+    if (trueLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    val trueLatest = PythonWorkerUtils.readUTF(trueLen, dataIn)
+
+    (cappedOffset, trueLatest)
+  }
+
+  /**
+   * Serializes a ReadLimit to JSON format for Python. Uses json4s for safe 
JSON construction.
+   *
+   * @param limit
+   *   the ReadLimit to serialize
+   * @return
+   *   JSON string representation
+   */
+  private def serializeReadLimit(limit: ReadLimit): String = {
+    import org.json4s.JsonDSL._
+    import org.json4s.jackson.JsonMethods._
+    import org.json4s.JValue
+
+    val json: JValue = limit match {

Review Comment:
   This supports more than what we document as a supported list in Python data 
source; what happens if we handle ReadMinRows here?



##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -88,25 +94,86 @@ def partitions_func(
         if it is None:
             write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
         else:
-            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)
+            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)  # noqa: E501
     else:
         write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
 
 
-def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:
+def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:  # noqa: E501
     end_offset = json.loads(utf8_deserializer.loads(infile))
     reader.commit(end_offset)
     write_int(0, outfile)
 
 
+def latest_offset_with_report_func(reader: DataSourceStreamReader, infile: IO, 
outfile: IO) -> None:
+    """
+    Handler for function ID 890: latestOffset with admission control
+    parameters.
+
+    This function supports both old and new reader implementations:
+    - Old readers: latestOffset() with no parameters -> no admission
+      control
+    - New readers: latestOffset(start, limit) with parameters ->
+      admission control enabled
+    """
+    start_offset_str = utf8_deserializer.loads(infile)
+    # Handle empty string as None for backward compatibility
+    start_offset = json.loads(start_offset_str) if start_offset_str else None
+    limit = json.loads(utf8_deserializer.loads(infile))
+
+    # Type declarations for mypy
+    capped_offset: dict
+    true_latest_offset: dict
+
+    try:
+        # Try calling with optional parameters (new signature)
+        result = reader.latestOffset(start_offset, limit)
+
+        # Check return type to determine behavior
+        if isinstance(result, tuple):
+            # New behavior: returns (capped_offset, true_latest_offset)
+            capped_offset, true_latest_offset = result
+        else:
+            # Old behavior or no admission control: single offset
+            capped_offset = true_latest_offset = result
+
+    except TypeError:
+        # Old signature that doesn't accept parameters - fallback
+        try:
+            fallback_result = reader.latestOffset()
+            # Handle both return types for backward compatibility
+            if isinstance(fallback_result, tuple):
+                capped_offset, true_latest_offset = fallback_result
+            else:
+                capped_offset = true_latest_offset = fallback_result
+        except Exception as fallback_error:
+            raise IllegalArgumentException(

Review Comment:
   I'm not very sure this is a representative exception. Shall we not wrap the 
exception and throw it as it is? If you want to give the info of operation into 
exception, it's OK to wrap but IllegalArgumentException is too narrow one to 
cover "Exception".
   



##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -88,25 +94,86 @@ def partitions_func(
         if it is None:
             write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
         else:
-            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)
+            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)  # noqa: E501
     else:
         write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
 
 
-def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:
+def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:  # noqa: E501
     end_offset = json.loads(utf8_deserializer.loads(infile))
     reader.commit(end_offset)
     write_int(0, outfile)
 
 
+def latest_offset_with_report_func(reader: DataSourceStreamReader, infile: IO, 
outfile: IO) -> None:
+    """
+    Handler for function ID 890: latestOffset with admission control
+    parameters.
+
+    This function supports both old and new reader implementations:
+    - Old readers: latestOffset() with no parameters -> no admission
+      control
+    - New readers: latestOffset(start, limit) with parameters ->
+      admission control enabled
+    """
+    start_offset_str = utf8_deserializer.loads(infile)
+    # Handle empty string as None for backward compatibility
+    start_offset = json.loads(start_offset_str) if start_offset_str else None
+    limit = json.loads(utf8_deserializer.loads(infile))
+
+    # Type declarations for mypy
+    capped_offset: dict
+    true_latest_offset: dict
+
+    try:
+        # Try calling with optional parameters (new signature)
+        result = reader.latestOffset(start_offset, limit)
+
+        # Check return type to determine behavior
+        if isinstance(result, tuple):
+            # New behavior: returns (capped_offset, true_latest_offset)
+            capped_offset, true_latest_offset = result
+        else:
+            # Old behavior or no admission control: single offset
+            capped_offset = true_latest_offset = result
+
+    except TypeError:
+        # Old signature that doesn't accept parameters - fallback
+        try:
+            fallback_result = reader.latestOffset()
+            # Handle both return types for backward compatibility

Review Comment:
   This clearly shows a whole confusion with the function. Why old signature 
can return a tuple of "new signature"? This just makes our function signature 
to behavior 4 different sets of parameter & return type, which is far beyond 
the compatibility and arguably not needed.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")
+          action = "planPartitions",
+          s"unknown status code $prefetchedRecordsStatus")
     }
     (pickledPartitions.toArray, iter)
   }
 
+  /**
+   * Invokes latestOffset(startOffset, limit) function with admission control 
parameters.
+   */
+  def latestOffset(startOffset: String, limit: ReadLimit): String = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_LIMIT_FUNC_ID)
+    PythonWorkerUtils.writeUTF(startOffset, dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes reportLatestOffset() function to get the true latest available 
offset.
+   */
+  def reportLatestOffset(): String = {
+    dataOut.writeInt(REPORT_LATEST_OFFSET_FUNC_ID)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "reportLatestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes latestOffset with admission control and also fetches the true 
latest offset. This
+   * avoids race conditions by getting both offsets in a single RPC call.
+   *
+   * @param startOffset
+   *   the starting offset (may be null)
+   * @param limit
+   *   the read limit to apply
+   * @return
+   *   tuple of (capped offset with limit applied, true latest offset)
+   * @since 4.2.0
+   */
+  def latestOffsetWithReport(startOffset: String, limit: ReadLimit): (String, 
String) = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_REPORT_FUNC_ID)
+    // Handle null startOffset by writing empty string
+    PythonWorkerUtils.writeUTF(Option(startOffset).getOrElse(""), dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+
+    // Read capped offset
+    val cappedLen = dataIn.readInt()
+    if (cappedLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    val cappedOffset = PythonWorkerUtils.readUTF(cappedLen, dataIn)
+
+    // Read true latest offset
+    val trueLen = dataIn.readInt()
+    if (trueLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",

Review Comment:
   nit: maybe good to follow the existing style



##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -88,25 +94,86 @@ def partitions_func(
         if it is None:
             write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
         else:
-            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)
+            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)  # noqa: E501
     else:
         write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
 
 
-def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:
+def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:  # noqa: E501
     end_offset = json.loads(utf8_deserializer.loads(infile))
     reader.commit(end_offset)
     write_int(0, outfile)
 
 
+def latest_offset_with_report_func(reader: DataSourceStreamReader, infile: IO, 
outfile: IO) -> None:
+    """
+    Handler for function ID 890: latestOffset with admission control
+    parameters.
+
+    This function supports both old and new reader implementations:
+    - Old readers: latestOffset() with no parameters -> no admission
+      control
+    - New readers: latestOffset(start, limit) with parameters ->
+      admission control enabled
+    """
+    start_offset_str = utf8_deserializer.loads(infile)
+    # Handle empty string as None for backward compatibility

Review Comment:
   AFAIK we won't get an empty string and there is nothing we handle in terms 
of backward compatibility. I guess we have a huge confusion in terms of 
compatibility.



##########
python/pyspark/sql/datasource_internal.py:
##########


Review Comment:
   After SupportsAdmissionControl, this doesn't need to track the current 
offset by itself. start() will be always available.



##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -88,14 +88,65 @@ def initialOffset(self) -> dict:
             self.initial_offset = self.simple_reader.initialOffset()
         return self.initial_offset
 
-    def latestOffset(self) -> dict:
+    def latestOffset(
+        self, start: Optional[dict] = None, limit: Optional[dict] = None
+    ) -> Union[dict, Tuple[dict, dict]]:
         # when query start for the first time, use initial offset as the start 
offset.
         if self.current_offset is None:
             self.current_offset = self.initialOffset()
-        (iter, end) = self.simple_reader.read(self.current_offset)
-        self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
-        self.current_offset = end
-        return end
+
+        # For backward compatibility: if called without parameters, use old 
behavior
+        if start is None and limit is None:
+            # Old behavior - no admission control
+            (full_iter, true_end) = 
self.simple_reader.read(self.current_offset)
+            self.cache.append(PrefetchedCacheEntry(self.current_offset, 
true_end, full_iter))
+            self.current_offset = true_end
+            return true_end
+
+        # New behavior with admission control support
+        # If start is not provided, use current offset

Review Comment:
   AFAIK, this does not happen with DSv2 data source + SupportsAdmissionControl.



##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -88,14 +88,65 @@ def initialOffset(self) -> dict:
             self.initial_offset = self.simple_reader.initialOffset()
         return self.initial_offset
 
-    def latestOffset(self) -> dict:
+    def latestOffset(
+        self, start: Optional[dict] = None, limit: Optional[dict] = None
+    ) -> Union[dict, Tuple[dict, dict]]:
         # when query start for the first time, use initial offset as the start 
offset.
         if self.current_offset is None:
             self.current_offset = self.initialOffset()
-        (iter, end) = self.simple_reader.read(self.current_offset)
-        self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
-        self.current_offset = end
-        return end
+
+        # For backward compatibility: if called without parameters, use old 
behavior
+        if start is None and limit is None:
+            # Old behavior - no admission control
+            (full_iter, true_end) = 
self.simple_reader.read(self.current_offset)
+            self.cache.append(PrefetchedCacheEntry(self.current_offset, 
true_end, full_iter))
+            self.current_offset = true_end
+            return true_end
+
+        # New behavior with admission control support
+        # If start is not provided, use current offset
+        if start is None:
+            start = self.current_offset
+
+        # Call simple reader's read() to get all available data
+        (full_iter, true_end) = self.simple_reader.read(start)
+
+        # Check if admission control is enabled
+        if limit is not None and limit.get("type") == "maxRows":
+            max_rows = limit["maxRows"]
+            # Convert iterator to list to allow length calculation and slicing
+            data_list = list(full_iter)
+
+            if len(data_list) <= max_rows:
+                # All data fits within limit
+                capped_iter = iter(data_list)
+                capped_end = true_end
+            else:
+                # Cap the data to max_rows
+                capped_data = data_list[:max_rows]
+                capped_iter = iter(capped_data)
+                # Calculate capped offset based on how many rows we're 
actually taking

Review Comment:
   IMHO this is rather a hack. I'd rather not assume the offset to be 
sequential numeric, regardless of which field name it uses. If we want to 
support admission control for simple data stream reader, we should let the data 
source implementation to do that.
   
   That said, my question follows - do we really need to support admission 
control for simple data stream reader?



##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -88,14 +88,65 @@ def initialOffset(self) -> dict:
             self.initial_offset = self.simple_reader.initialOffset()
         return self.initial_offset
 
-    def latestOffset(self) -> dict:
+    def latestOffset(
+        self, start: Optional[dict] = None, limit: Optional[dict] = None
+    ) -> Union[dict, Tuple[dict, dict]]:
         # when query start for the first time, use initial offset as the start 
offset.
         if self.current_offset is None:
             self.current_offset = self.initialOffset()
-        (iter, end) = self.simple_reader.read(self.current_offset)
-        self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
-        self.current_offset = end
-        return end
+
+        # For backward compatibility: if called without parameters, use old 
behavior

Review Comment:
   Just to remind, I don't think the function will be called without parameters.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala:
##########


Review Comment:
   nit: Shall we revert the change if it was just style and unrelated to your 
actual change?



##########
python/pyspark/sql/datasource.py:
##########
@@ -714,20 +714,46 @@ def initialOffset(self) -> dict:
             messageParameters={"feature": "initialOffset"},
         )
 
-    def latestOffset(self) -> dict:
+    def latestOffset(
+        self, start: Optional[dict] = None, limit: Optional[dict] = None
+    ) -> Union[dict, Tuple[dict, dict]]:
         """
         Returns the most recent offset available.
 
+        Parameters (optional - added in Spark 4.2 for admission control)
+        -------------------------------------------------------------------
+        start : dict, optional
+            The starting offset. Enables admission control when provided.

Review Comment:
   If we include allAvailable as a part of admission control, admission control 
is always enabled. It's actually whether the data source implementation "can" 
support admission control or not, and the engine will check it and behave 
accordingly.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -28,15 +29,38 @@ import org.apache.spark.storage.{PythonStreamBlockId, 
StorageLevel}
 
 case class PythonStreamingSourceOffset(json: String) extends Offset
 
+/**
+ * Micro-batch stream implementation for Python data sources with admission 
control support.
+ *
+ * This class bridges JVM Spark streaming with Python-based data sources, 
supporting:
+ *   - Admission control via ReadLimit (maxRecordsPerBatch, maxFilesPerBatch, 
maxBytesPerBatch)
+ *   - Offset tracking and management
+ *   - Latest seen offset for prefetching optimization
+ *
+ * Admission control options:
+ *   - `maxRecordsPerBatch`: Maximum number of rows per batch (Long, must be > 
0)
+ *   - `maxFilesPerBatch`: Maximum number of files per batch (Int, must be > 0)
+ *   - `maxBytesPerBatch`: Maximum bytes per batch (Long, must be > 0)
+ *
+ * @param ds
+ *   the Python data source V2 instance
+ * @param shortName
+ *   short name of the data source
+ * @param outputSchema
+ *   the output schema
+ * @param options
+ *   configuration options including admission control settings
+ * @since 4.2.0

Review Comment:
   This class isn't since 4.2.0, right? It had been there from 4.0.0.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -125,25 +129,12 @@ class PythonStreamingSourceRunner(
     if (initStatus == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
       val msg = PythonWorkerUtils.readUTF(dataIn)
       throw QueryCompilationErrors.pythonDataSourceError(
-        action = "plan", tpe = "initialize source", msg = msg)

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -154,16 +145,18 @@ class PythonStreamingSourceRunner(
     if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
       val msg = PythonWorkerUtils.readUTF(dataIn)
       throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "initialOffset", msg)
+        action = "initialOffset",
+        msg)
     }
     PythonWorkerUtils.readUTF(len, dataIn)
   }
 
   /**
    * Invokes partitions(start, end) function of the stream reader and receive 
the return value.
    */
-  def partitions(start: String, end: String): (Array[Array[Byte]], 
Option[Iterator[InternalRow]]) =

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -54,15 +59,14 @@ object PythonStreamingSourceRunner {
 }
 
 /**
- * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.

Review Comment:
   nit: class doc and class param - they don't need to be changed. Shall we 
revert the change?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -174,7 +167,8 @@ class PythonStreamingSourceRunner(
     if (numPartitions == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
       val msg = PythonWorkerUtils.readUTF(dataIn)
       throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-        action = "planPartitions", msg)

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")
+          action = "planPartitions",
+          s"unknown status code $prefetchedRecordsStatus")
     }
     (pickledPartitions.toArray, iter)
   }
 
+  /**
+   * Invokes latestOffset(startOffset, limit) function with admission control 
parameters.
+   */
+  def latestOffset(startOffset: String, limit: ReadLimit): String = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_LIMIT_FUNC_ID)
+    PythonWorkerUtils.writeUTF(startOffset, dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes reportLatestOffset() function to get the true latest available 
offset.
+   */
+  def reportLatestOffset(): String = {
+    dataOut.writeInt(REPORT_LATEST_OFFSET_FUNC_ID)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "reportLatestOffset",

Review Comment:
   nit: maybe good to follow the existing style



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -49,37 +73,120 @@ class PythonMicroBatchStream(
   // from python to JVM.
   private var cachedInputPartition: Option[(String, String, 
PythonStreamingInputPartition)] = None
 
+  // Store the latest available offset for reporting
+  private var latestAvailableOffset: Option[PythonStreamingSourceOffset] = None
+
   private val runner: PythonStreamingSourceRunner =
     new PythonStreamingSourceRunner(createDataSourceFunc, outputSchema)
   runner.init()
 
   override def initialOffset(): Offset = 
PythonStreamingSourceOffset(runner.initialOffset())
 
-  override def latestOffset(): Offset = 
PythonStreamingSourceOffset(runner.latestOffset())
+  /**
+   * Returns the default read limit based on configured options. Supports: 
maxRecordsPerBatch,
+   * maxFilesPerBatch, maxBytesPerBatch. Falls back to allAvailable if no 
valid limit is
+   * configured.
+   *
+   * @since 4.2.0
+   */
+  override def getDefaultReadLimit: ReadLimit = {
+    import scala.util.Try
+
+    def parseLong(key: String): Long = {
+      Try(options.get(key).toLong).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    def parseInt(key: String): Int = {
+      Try(options.get(key).toInt).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    if (options.containsKey("maxRecordsPerBatch")) {

Review Comment:
   It'd be preferred to use constant fields for constant strings - let's put 
constants in companion object.



##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -88,25 +94,86 @@ def partitions_func(
         if it is None:
             write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
         else:
-            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)
+            send_batch_func(it, outfile, schema, max_arrow_batch_size, 
data_source)  # noqa: E501
     else:
         write_int(PREFETCHED_RECORDS_NOT_FOUND, outfile)
 
 
-def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:
+def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:  # noqa: E501
     end_offset = json.loads(utf8_deserializer.loads(infile))
     reader.commit(end_offset)
     write_int(0, outfile)
 
 
+def latest_offset_with_report_func(reader: DataSourceStreamReader, infile: IO, 
outfile: IO) -> None:
+    """
+    Handler for function ID 890: latestOffset with admission control
+    parameters.
+
+    This function supports both old and new reader implementations:
+    - Old readers: latestOffset() with no parameters -> no admission
+      control
+    - New readers: latestOffset(start, limit) with parameters ->
+      admission control enabled
+    """
+    start_offset_str = utf8_deserializer.loads(infile)
+    # Handle empty string as None for backward compatibility
+    start_offset = json.loads(start_offset_str) if start_offset_str else None
+    limit = json.loads(utf8_deserializer.loads(infile))
+
+    # Type declarations for mypy
+    capped_offset: dict
+    true_latest_offset: dict
+
+    try:
+        # Try calling with optional parameters (new signature)
+        result = reader.latestOffset(start_offset, limit)
+
+        # Check return type to determine behavior
+        if isinstance(result, tuple):
+            # New behavior: returns (capped_offset, true_latest_offset)
+            capped_offset, true_latest_offset = result
+        else:
+            # Old behavior or no admission control: single offset
+            capped_offset = true_latest_offset = result
+
+    except TypeError:
+        # Old signature that doesn't accept parameters - fallback
+        try:
+            fallback_result = reader.latestOffset()
+            # Handle both return types for backward compatibility
+            if isinstance(fallback_result, tuple):
+                capped_offset, true_latest_offset = fallback_result
+            else:
+                capped_offset = true_latest_offset = fallback_result
+        except Exception as fallback_error:
+            raise IllegalArgumentException(
+                errorClass="UNSUPPORTED_OPERATION",
+                messageParameters={
+                    "operation": f"latestOffset call failed: 
{str(fallback_error)}"  # noqa: E501
+                },
+            )
+    except Exception as e:
+        raise IllegalArgumentException(

Review Comment:
   I'm not very sure this is a representative exception. Shall we not wrap the 
exception and throw it as it is? If you want to give the info of operation into 
exception, it's OK to wrap but IllegalArgumentException is too narrow one to 
cover "Exception".



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala:
##########
@@ -188,14 +182,130 @@ class PythonStreamingSourceRunner(
       case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
         val msg = PythonWorkerUtils.readUTF(dataIn)
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", msg)
+          action = "planPartitions",
+          msg)
       case _ =>
         throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
-          action = "planPartitions", s"unknown status code 
$prefetchedRecordsStatus")
+          action = "planPartitions",
+          s"unknown status code $prefetchedRecordsStatus")
     }
     (pickledPartitions.toArray, iter)
   }
 
+  /**
+   * Invokes latestOffset(startOffset, limit) function with admission control 
parameters.
+   */
+  def latestOffset(startOffset: String, limit: ReadLimit): String = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_LIMIT_FUNC_ID)
+    PythonWorkerUtils.writeUTF(startOffset, dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes reportLatestOffset() function to get the true latest available 
offset.
+   */
+  def reportLatestOffset(): String = {
+    dataOut.writeInt(REPORT_LATEST_OFFSET_FUNC_ID)
+    dataOut.flush()
+    val len = dataIn.readInt()
+    if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "reportLatestOffset",
+        msg)
+    }
+    PythonWorkerUtils.readUTF(len, dataIn)
+  }
+
+  /**
+   * Invokes latestOffset with admission control and also fetches the true 
latest offset. This
+   * avoids race conditions by getting both offsets in a single RPC call.
+   *
+   * @param startOffset
+   *   the starting offset (may be null)
+   * @param limit
+   *   the read limit to apply
+   * @return
+   *   tuple of (capped offset with limit applied, true latest offset)
+   * @since 4.2.0
+   */
+  def latestOffsetWithReport(startOffset: String, limit: ReadLimit): (String, 
String) = {
+    dataOut.writeInt(LATEST_OFFSET_WITH_REPORT_FUNC_ID)
+    // Handle null startOffset by writing empty string
+    PythonWorkerUtils.writeUTF(Option(startOffset).getOrElse(""), dataOut)
+    PythonWorkerUtils.writeUTF(serializeReadLimit(limit), dataOut)
+    dataOut.flush()
+
+    // Read capped offset
+    val cappedLen = dataIn.readInt()
+    if (cappedLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    val cappedOffset = PythonWorkerUtils.readUTF(cappedLen, dataIn)
+
+    // Read true latest offset
+    val trueLen = dataIn.readInt()
+    if (trueLen == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
+        action = "latestOffset",
+        msg)
+    }
+    val trueLatest = PythonWorkerUtils.readUTF(trueLen, dataIn)
+
+    (cappedOffset, trueLatest)
+  }
+
+  /**
+   * Serializes a ReadLimit to JSON format for Python. Uses json4s for safe 
JSON construction.
+   *
+   * @param limit
+   *   the ReadLimit to serialize
+   * @return
+   *   JSON string representation
+   */
+  private def serializeReadLimit(limit: ReadLimit): String = {
+    import org.json4s.JsonDSL._
+    import org.json4s.jackson.JsonMethods._
+    import org.json4s.JValue
+
+    val json: JValue = limit match {

Review Comment:
   Maybe good to mention the limit parameter will be return value of 
getDefaultReadLimit() in PythonMicroBatchStream, so that we don't need to worry 
about all available types.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -49,37 +73,120 @@ class PythonMicroBatchStream(
   // from python to JVM.
   private var cachedInputPartition: Option[(String, String, 
PythonStreamingInputPartition)] = None
 
+  // Store the latest available offset for reporting
+  private var latestAvailableOffset: Option[PythonStreamingSourceOffset] = None
+
   private val runner: PythonStreamingSourceRunner =
     new PythonStreamingSourceRunner(createDataSourceFunc, outputSchema)
   runner.init()
 
   override def initialOffset(): Offset = 
PythonStreamingSourceOffset(runner.initialOffset())
 
-  override def latestOffset(): Offset = 
PythonStreamingSourceOffset(runner.latestOffset())
+  /**
+   * Returns the default read limit based on configured options. Supports: 
maxRecordsPerBatch,
+   * maxFilesPerBatch, maxBytesPerBatch. Falls back to allAvailable if no 
valid limit is
+   * configured.
+   *
+   * @since 4.2.0
+   */
+  override def getDefaultReadLimit: ReadLimit = {
+    import scala.util.Try
+
+    def parseLong(key: String): Long = {
+      Try(options.get(key).toLong).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    def parseInt(key: String): Int = {
+      Try(options.get(key).toInt).toOption.filter(_ > 0).getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value '${options.get(key)}' for option '$key', must be a 
positive integer")
+      }
+    }
+
+    if (options.containsKey("maxRecordsPerBatch")) {
+      val records = parseLong("maxRecordsPerBatch")
+      logInfo(s"Admission control: maxRecordsPerBatch = $records")
+      ReadLimit.maxRows(records)
+    } else if (options.containsKey("maxFilesPerBatch")) {
+      val files = parseInt("maxFilesPerBatch")
+      logInfo(s"Admission control: maxFilesPerBatch = $files")
+      ReadLimit.maxFiles(files)
+    } else if (options.containsKey("maxBytesPerBatch")) {
+      val bytes = parseLong("maxBytesPerBatch")
+      logInfo(s"Admission control: maxBytesPerBatch = $bytes")
+      ReadLimit.maxBytes(bytes)
+    } else {
+      logDebug("No admission control limit configured, using allAvailable")
+      ReadLimit.allAvailable()
+    }
+  }
+
+  override def latestOffset(): Offset = {
+    // Bridge to new signature with default read limit for backward 
compatibility
+    // Pass null as start offset to maintain backward compatibility with old 
behavior
+    latestOffset(null, getDefaultReadLimit)
+  }
+
+  /**
+   * Returns the latest offset with admission control limit applied. Also 
updates the true latest
+   * offset for reporting purposes.
+   *
+   * @param startOffset
+   *   the starting offset, may be null

Review Comment:
   nit: Btw, let's not add documentation unless there is noticeable difference 
from the method doc of base method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to