jiteshsoni commented on code in PR #53085:
URL: https://github.com/apache/spark/pull/53085#discussion_r2627298676


##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -88,14 +88,65 @@ def initialOffset(self) -> dict:
             self.initial_offset = self.simple_reader.initialOffset()
         return self.initial_offset
 
-    def latestOffset(self) -> dict:
+    def latestOffset(
+        self, start: Optional[dict] = None, limit: Optional[dict] = None
+    ) -> Union[dict, Tuple[dict, dict]]:
         # when query start for the first time, use initial offset as the start 
offset.
         if self.current_offset is None:
             self.current_offset = self.initialOffset()
-        (iter, end) = self.simple_reader.read(self.current_offset)
-        self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
-        self.current_offset = end
-        return end
+
+        # For backward compatibility: if called without parameters, use old 
behavior
+        if start is None and limit is None:
+            # Old behavior - no admission control
+            (full_iter, true_end) = 
self.simple_reader.read(self.current_offset)
+            self.cache.append(PrefetchedCacheEntry(self.current_offset, 
true_end, full_iter))
+            self.current_offset = true_end
+            return true_end
+
+        # New behavior with admission control support
+        # If start is not provided, use current offset
+        if start is None:
+            start = self.current_offset
+
+        # Call simple reader's read() to get all available data
+        (full_iter, true_end) = self.simple_reader.read(start)
+
+        # Check if admission control is enabled
+        if limit is not None and limit.get("type") == "maxRows":
+            max_rows = limit["maxRows"]
+            # Convert iterator to list to allow length calculation and slicing
+            data_list = list(full_iter)
+
+            if len(data_list) <= max_rows:
+                # All data fits within limit
+                capped_iter = iter(data_list)
+                capped_end = true_end
+            else:
+                # Cap the data to max_rows
+                capped_data = data_list[:max_rows]
+                capped_iter = iter(capped_data)
+                # Calculate capped offset based on how many rows we're 
actually taking

Review Comment:
   Accepted your suggestion and removing admission control from 
_SimpleStreamReaderWrapper



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to