Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-05-08 Thread via GitHub


chaoqin-li1123 commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2101253872

   This is the fix https://github.com/apache/spark/pull/46481 @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-05-08 Thread via GitHub


dongjoon-hyun commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2101232136

   Thank you so much, @chaoqin-li1123 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-05-08 Thread via GitHub


chaoqin-li1123 commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2101230022

   Yes, I notice that, will send out a fix PR today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-30 Thread via GitHub


HeartSaVioR closed pull request #45977: [SPARK-47793][SS][PYTHON] Implement 
SimpleDataSourceStreamReader for python streaming data source
URL: https://github.com/apache/spark/pull/45977


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-30 Thread via GitHub


HeartSaVioR commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2085292815

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-30 Thread via GitHub


HeartSaVioR commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2085292610

   The GA only failed with docker integration test which isn't related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1584075441


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through 
readBetweenOffsets(start, end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.
+if self.current_offset is None:
+self.current_offset = self.initialOffset()
+(iter, end) = self.simple_reader.read(self.current_offset)
+self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
+self.current_offset = end
+return end
+
+def commit(self, end: dict) -> None:
+if self.current_offset is None:
+self.current_offset = end
+
+end_idx = -1
+for idx, entry in enumerate(self.cache):
+if json.dumps(entry.end) == json.dumps(end):
+end_idx = idx
+break
+if end_idx > 0:

Review Comment:
   OK, would be nice to add code comment (probably one-liner) to explicitly 
provide the intention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1584067812


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   Yeah that would have to be documented. BTW, in Python you can't enforce 
anything in any event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583666355


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -199,4 +223,30 @@ class PythonStreamingSourceRunner(
 logError("Exception when trying to kill worker", e)
 }
   }
+
+  private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+s"stream reader for $pythonExec", 0, Long.MaxValue)
+
+  def readArrowRecordBatches(): Iterator[InternalRow] = {
+assert(dataIn.readInt() == SpecialLengths.START_ARROW_STREAM)
+val reader = new ArrowStreamReader(dataIn, allocator)
+val root = reader.getVectorSchemaRoot()
+// When input is empty schema can't be read.
+val schema = ArrowUtils.fromArrowSchema(root.getSchema())
+assert(schema == outputSchema)
+
+val vectors = root.getFieldVectors().asScala.map { vector =>
+  new ArrowColumnVector(vector)
+}.toArray[ColumnVector]
+val rows = ArrayBuffer[InternalRow]()

Review Comment:
   If we call putIterator here we should be able to avoid materializing all 
rows at once in scala side, but it doesn't matter that much as we already 
materialize all rows in python side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583663736


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##
@@ -35,6 +38,14 @@ class PythonMicroBatchStream(
 ds.source.createPythonFunction(
   ds.getOrCreateDataSourceInPython(shortName, options, 
Some(outputSchema)).dataSource)
 
+  private val streamId = nextStreamId
+  private var nextBlockId = 0L
+
+  // planInputPartitions() maybe be called multiple times for the current 
microbatch.
+  // Cache the result of planInputPartitions() because it may involve sending 
data
+  // from python to JVM.
+  private var cachedInputPartition: Option[(String, String, 
PythonStreamingInputPartition)] = None

Review Comment:
   I added the overwrite that columnar read is not supported.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##
@@ -44,9 +55,29 @@ class PythonMicroBatchStream(
   override def latestOffset(): Offset = 
PythonStreamingSourceOffset(runner.latestOffset())
 
   override def planInputPartitions(start: Offset, end: Offset): 
Array[InputPartition] = {
-runner.partitions(start.asInstanceOf[PythonStreamingSourceOffset].json,
-  end.asInstanceOf[PythonStreamingSourceOffset].json)
-  .zipWithIndex.map(p => PythonInputPartition(p._2, p._1))
+val start_offset_json = 
start.asInstanceOf[PythonStreamingSourceOffset].json

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583661083


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through 
readBetweenOffsets(start, end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.
+if self.current_offset is None:
+self.current_offset = self.initialOffset()
+(iter, end) = self.simple_reader.read(self.current_offset)
+self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
+self.current_offset = end
+return end
+
+def commit(self, end: dict) -> None:
+if self.current_offset is None:
+self.current_offset = end
+
+end_idx = -1
+for idx, entry in enumerate(self.cache):
+if json.dumps(entry.end) == json.dumps(end):
+end_idx = idx
+break
+if end_idx > 0:
+# Drop prefetched data for batch that has been committed.
+self.cache = self.cache[end_idx:]
+self.simple_reader.commit(end)
+
+def partitions(self, start: dict, end: dict) -> Sequence["InputPartition"]:
+# when query restart from checkpoint, use the last committed offset as 
the start offset.
+# This 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583658095


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Thanks for the suggestion, added a logic to delete last committed entry in 
the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583652470


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources.v2.python
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.PythonStreamBlockId
+
+
+case class PythonStreamingInputPartition(
+index: Int,
+pickedPartition: Array[Byte],
+blockId: Option[PythonStreamBlockId]) extends InputPartition {
+  def dropCache(): Unit = {
+blockId.foreach(SparkEnv.get.blockManager.master.removeBlock(_))
+  }
+}
+
+class PythonStreamingPartitionReaderFactory(
+source: UserDefinedPythonDataSource,
+pickledReadFunc: Array[Byte],
+outputSchema: StructType,
+jobArtifactUUID: Option[String])
+  extends PartitionReaderFactory with Logging {
+
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+val part = partition.asInstanceOf[PythonStreamingInputPartition]
+
+// Maybe read from cached block prefetched by SimpleStreamReader
+lazy val cachedBlock = if (part.blockId.isDefined) {
+  val block = SparkEnv.get.blockManager.get[InternalRow](part.blockId.get)
+.map(_.data.asInstanceOf[Iterator[InternalRow]])
+  if (block.isEmpty) {
+logWarning(s"Prefetched block ${part.blockId} for Python data source 
not found.")
+  }
+  block
+} else None
+
+new PartitionReader[InternalRow] {
+
+  private[this] val metrics: Map[String, SQLMetric] = 
PythonCustomMetric.pythonMetrics
+
+  private val outputIter = if (cachedBlock.isEmpty) {
+// Evaluate the python read UDF if the partition is not cached as 
block.
+val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
+  pickledReadFunc,

Review Comment:
   This will be triggered if the during replay of the last batch when query 
restart, I added the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583649763


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   Then we won't be able to enforce that read with end offset is implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583576307


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through 
readBetweenOffsets(start, end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.
+if self.current_offset is None:
+self.current_offset = self.initialOffset()
+(iter, end) = self.simple_reader.read(self.current_offset)
+self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
+self.current_offset = end
+return end
+
+def commit(self, end: dict) -> None:
+if self.current_offset is None:
+self.current_offset = end
+
+end_idx = -1
+for idx, entry in enumerate(self.cache):
+if json.dumps(entry.end) == json.dumps(end):
+end_idx = idx
+break
+if end_idx > 0:

Review Comment:
   I am trying to be conservative here when evicting cache by keeping one extra 
entry. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583569700


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through 
readBetweenOffsets(start, end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.
+if self.current_offset is None:
+self.current_offset = self.initialOffset()
+(iter, end) = self.simple_reader.read(self.current_offset)
+self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
+self.current_offset = end
+return end
+
+def commit(self, end: dict) -> None:
+if self.current_offset is None:
+self.current_offset = end
+
+end_idx = -1
+for idx, entry in enumerate(self.cache):
+if json.dumps(entry.end) == json.dumps(end):
+end_idx = idx
+break
+if end_idx > 0:
+# Drop prefetched data for batch that has been committed.
+self.cache = self.cache[end_idx:]
+self.simple_reader.commit(end)
+
+def partitions(self, start: dict, end: dict) -> Sequence["InputPartition"]:
+# when query restart from checkpoint, use the last committed offset as 
the start offset.
+# This 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583126915


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -60,14 +68,29 @@ def latest_offset_func(reader: DataSourceStreamReader, 
outfile: IO) -> None:
 write_with_length(json.dumps(offset).encode("utf-8"), outfile)
 
 
-def partitions_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) 
-> None:
+def partitions_func(
+reader: DataSourceStreamReader,
+data_source: DataSource,
+schema: StructType,
+max_arrow_batch_size: int,
+infile: IO,
+outfile: IO,
+) -> None:
 start_offset = json.loads(utf8_deserializer.loads(infile))
 end_offset = json.loads(utf8_deserializer.loads(infile))
 partitions = reader.partitions(start_offset, end_offset)
 # Return the serialized partition values.
 write_int(len(partitions), outfile)
 for partition in partitions:
 pickleSer._write_with_length(partition, outfile)
+if isinstance(reader, _SimpleStreamReaderWrapper):
+it = reader.getCache(start_offset, end_offset)
+if it is None:

Review Comment:
   Never mind, got the rationale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1583126078


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through 
readBetweenOffsets(start, end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.
+if self.current_offset is None:
+self.current_offset = self.initialOffset()
+(iter, end) = self.simple_reader.read(self.current_offset)
+self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
+self.current_offset = end
+return end
+
+def commit(self, end: dict) -> None:
+if self.current_offset is None:
+self.current_offset = end
+
+end_idx = -1
+for idx, entry in enumerate(self.cache):
+if json.dumps(entry.end) == json.dumps(end):
+end_idx = idx
+break
+if end_idx > 0:
+# Drop prefetched data for batch that has been committed.
+self.cache = self.cache[end_idx:]
+self.simple_reader.commit(end)
+
+def partitions(self, start: dict, end: dict) -> Sequence["InputPartition"]:
+# when query restart from checkpoint, use the last committed offset as 
the start offset.
+# This 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1582734755


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -60,14 +68,29 @@ def latest_offset_func(reader: DataSourceStreamReader, 
outfile: IO) -> None:
 write_with_length(json.dumps(offset).encode("utf-8"), outfile)
 
 
-def partitions_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) 
-> None:
+def partitions_func(
+reader: DataSourceStreamReader,
+data_source: DataSource,
+schema: StructType,
+max_arrow_batch_size: int,
+infile: IO,
+outfile: IO,
+) -> None:
 start_offset = json.loads(utf8_deserializer.loads(infile))
 end_offset = json.loads(utf8_deserializer.loads(infile))
 partitions = reader.partitions(start_offset, end_offset)
 # Return the serialized partition values.
 write_int(len(partitions), outfile)
 for partition in partitions:
 pickleSer._write_with_length(partition, outfile)
+if isinstance(reader, _SimpleStreamReaderWrapper):
+it = reader.getCache(start_offset, end_offset)
+if it is None:

Review Comment:
   Likewise I mentioned above, we could always send the batch here and 
eliminate necessity of serializing SimpleStreamReader and also wrapper.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##
@@ -491,9 +492,15 @@ class MicroBatchExecution(
 case (source: Source, end: Offset) =>
   val start = 
execCtx.startOffsets.get(source).map(_.asInstanceOf[Offset])
   source.getBatch(start, end)
-case nonV1Tuple =>
-  // The V2 API does not have the same edge case requiring 
getBatch to be called
-  // here, so we do nothing here.
+case (source: PythonMicroBatchStream, end: Offset) =>
+  // PythonMicrobatchStream need to initialize the start 
offset of prefetching
+  // by calling planInputPartitions of the last completed 
batch during restart.
+  // We don't need to do that if there is incomplete batch in 
the offset log
+  // because planInputPartitions during batch replay 
initializes the start offset.
+  val start = 
execCtx.startOffsets.get(source).map(_.asInstanceOf[Offset])
+  
source.planInputPartitions(source.deserializeOffset(start.get.json()),

Review Comment:
   Is it safe to assume that `start` is always Some(v) rather than None?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##
@@ -35,6 +38,14 @@ class PythonMicroBatchStream(
 ds.source.createPythonFunction(
   ds.getOrCreateDataSourceInPython(shortName, options, 
Some(outputSchema)).dataSource)
 
+  private val streamId = nextStreamId
+  private var nextBlockId = 0L
+
+  // planInputPartitions() maybe be called multiple times for the current 
microbatch.
+  // Cache the result of planInputPartitions() because it may involve sending 
data
+  // from python to JVM.
+  private var cachedInputPartition: Option[(String, String, 
PythonStreamingInputPartition)] = None

Review Comment:
   While it could be reduced in above change, it's still safer to leave the 
cache as it is. It's not a strong guarantee that planInputPartitions() is only 
called once (otherwise the above should be a bugfix about contract violation 
not an optimization).



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##
@@ -66,9 +97,18 @@ class PythonMicroBatchStream(
   }
 
   override def stop(): Unit = {
+cachedInputPartition.foreach(_._3.dropCache())
 runner.stop()
   }
 
   override def deserializeOffset(json: String): Offset = 
PythonStreamingSourceOffset(json)
 }
 
+object PythonMicroBatchStream {
+  var currentId = 0

Review Comment:
   I don't think this is thread-safe, unless this variable is only accessible 
with nextStreamId. If you are not intentional to expose this to public, please 
explicitly block it. (private)



##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1582652464


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   Ah then all good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-29 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1582649481


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   The thing is, both SimpleInputPartition and PrefetchedCacheEntry aren't 
developer APIs. Data source developers aren't expected to know about this, 
unless they go deep dive on debugging when Spark has a bug. These are rather 
internal classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-28 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1582469363


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   here 
https://github.com/apache/spark/tree/master/python/docs/source/user_guide/sql. 
feel free to do it separately, I don't mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-28 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1582466426


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Let's avoid randomness - you can manipulate both tests on restarting 1) 
restarting from the query which does not have leftover batch 2) restarting from 
the query which does have leftover batch (planned-but-yet-to-be-committed). We 
have several tests which adjusts offset log and commit log to test the behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-28 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1582440585


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   It can't have overloaded ones but it can dispatch by embedding if-else, and 
leveraging optional argument. e.g.,
   
   ```python
   def read(self, start: dict, end: dict = None) -> 
Union[Tuple[Iterator[Tuple], dict], Iterator[Tuple]:
   
   if end is None:
   return # logic for read(start)
   else:
   return # logic for read(start, end)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581319890


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   How do we document it, do we add link to the python data source user guide 
in python user guide?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581314644


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   I realized the trick only works for V1 source and added the individual 
handling, let me also update the comment here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581304652


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Yes, we have tests where query get restarted multiple times and verify that 
replay microbatch succeeds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581301878


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   There can't be 2 methods named read() for the same class, python doesn't 
have method overloading IIRC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580671684


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   Maybe we should at least put it in user-guide 
(https://spark.apache.org/docs/latest/api/python/user_guide/index.html). 
Usually we don't document developer API but I think this is important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580659454


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   OK, never mind. You are dealing with all the thing individually (not just 
leveraging DSv1 trick). Your comment seems a bit confusing - mentioning 
getBatch was the starting point I got confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580659454


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   OK, never mind. You are dealing with all the thing individually (not just 
leveraging DSv1 trick). Your comment seems a bit confusing - mentioning 
getBatch was the starting point I got confused.
   
   Still better to have fault-tolerance test(s) if we don't have it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580640494


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -164,7 +178,20 @@ class PythonStreamingSourceRunner(
   val pickledPartition: Array[Byte] = PythonWorkerUtils.readBytes(dataIn)
   pickledPartitions.append(pickledPartition)
 }
-pickledPartitions.toArray
+val prefetchedRecordsStatus = dataIn.readInt()
+val iter: Option[Iterator[InternalRow]] = prefetchedRecordsStatus match {
+  case NON_EMPTY_PYARROW_RECORD_BATCHES => Some(readArrowRecordBatches())
+  case PREFETCHED_RECORDS_NOT_FOUND => None

Review Comment:
   What do we do if we hit this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580604894


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Actually this is the hard part of implementing prefetcher for SS data 
source. When the query restarts, we assume that prefetcher would be able to 
start from known committed offset. Unfortunately that is not true. You've 
mentioned that this relies on getBatch trick but that's only applicable with 
DSv1 and it's clearly a hack to address some specific data source. That is not 
a contract streaming engine guarantees.
   
   We have an interface `AcceptsLatestSeenOffset` for this case (you need to 
adopt this on determining the start offset for prefetching), but this still 
does not give you the last committed offset but the latest seen offset, so 
Spark could still request the offset range before this offset. Though it would 
work if the simple data source reader can work with all 
planned-but-not-yet-committed offset range without relying on prefetcher. 
prefetcher can start prefetching with latest seen offset and previous offset 
range should be covered with planned 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580636552


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   The name itself might be OK. Maybe we have an option to make the both of 
method names be self-descriptive (not just read), but if we prefer shorter 
name, maybe OK to have either to be "read".
   
   I see a bigger issue on implementation. Let's address that first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580604894


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Actually this is the hard part of implementing prefetcher for SS data 
source. When the query restarts, we assume that prefetcher would be able to 
start from known committed offset. Unfortunately that is not true. You've 
mentioned that this relies on getBatch trick but that's only applicable with 
DSv1 and it's clearly a hack to address some specific data source.
   
   We have an interface `AcceptsLatestSeenOffset` for this case (you need to 
adopt this on determining the start offset for prefetching), but this still 
does not give you the last committed offset but the latest seen offset, so 
Spark could still request the offset range before this offset. Though it would 
work if the simple data source reader can work with all 
planned-but-not-yet-committed offset range without relying on prefetcher. 
prefetcher can start prefetching with latest seen offset and previous offset 
range should be covered with planned batch(es).



-- 
This is an automated message from the 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580561802


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +494,103 @@ def stop(self) -> None:
 ...
 
 
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+Because  :class:`SimpleDataSourceStreamReader` read records in Spark 
driver node to determine
+end offset of each batch without partitioning, it is only supposed to be 
used in
+lightweight use cases where input rate and batch size is small.
+Use :class:`DataSourceStreamReader` when read throughput is high and can't 
be handled
+by a single process.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.

Review Comment:
   For stateful workload, this is not only about exactly once but also about 
correctness. Maybe we could just stop from "deterministic batch execution" 
rather than further elaboration.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +494,103 @@ def stop(self) -> None:
 ...
 
 
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+Because  :class:`SimpleDataSourceStreamReader` read records in Spark 
driver node to determine
+end offset of each batch without partitioning, it is only supposed to be 
used in
+lightweight use cases where input rate and batch size is small.
+Use :class:`DataSourceStreamReader` when read throughput is high and can't 
be handled
+by a single process.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-25 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1579266289


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   I think we should document them ... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-25 Thread via GitHub


HeartSaVioR commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2076604783

   I'll take a look tomorrow. Sorry for the delay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-24 Thread via GitHub


allisonwang-db commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1578365745


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   cc @HyukjinKwon do you know how we can hide this API or document this as 
developer only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-24 Thread via GitHub


HyukjinKwon commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2074486990

   The documentation build failure will go away if you sync/rebase your branch 
to `master` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-22 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1575445077


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):

Review Comment:
   I see, moved it to another file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-22 Thread via GitHub


allisonwang-db commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1575101729


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):

Review Comment:
   Yea let's not make it a public API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-22 Thread via GitHub


sahnib commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1574996954


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573100752


##
python/pyspark/sql/worker/plan_data_source_read.py:
##
@@ -51,6 +52,71 @@
 )
 
 
+def records_to_arrow_batches(
+output_iter: Iterator[Tuple],
+max_arrow_batch_size: int,
+return_type: StructType,
+data_source: DataSource,
+) -> Iterable[pa.RecordBatch]:

Review Comment:
   docstring added.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573100690


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):

Review Comment:
   Fixed.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573092072


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":

Review Comment:
   This is a private method to fall back to simple reader when streaming reader 
is not implemented. I would be ok to change this if there is an alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573085074


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):

Review Comment:
   Where should we move these functions to if we don't want them to be public 
API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573085009


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):

Review Comment:
   Simple reader doesn't expose partitioning to user, so this is only used for 
the wrapper to make simple reader integrate with streaming engine. Where should 
we move these internal code if we don't want it to be public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572915019


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+try:
+return self.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))

Review Comment:
   Make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572914583


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


allisonwang-db commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572908487


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+try:
+return self.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))
+
+def simpleStreamReader(self, schema: StructType) -> 
"SimpleDataSourceStreamReader":
+"""
+Returns a :class:`SimpleDataSourceStreamReader` instance for reading 
data.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming
+data source.

Review Comment:
   Can we be more explicit about when users should choose streamReader versus 
simpleStreamReader here? This information will be included in the API 
documentation for this class.



##
python/pyspark/sql/worker/plan_data_source_read.py:
##
@@ -51,6 +52,71 @@
 )
 
 
+def records_to_arrow_batches(
+output_iter: Iterator[Tuple],
+max_arrow_batch_size: int,
+return_type: StructType,
+data_source: DataSource,
+) -> Iterable[pa.RecordBatch]:

Review Comment:
   Let's add some docstring for this function.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):

Review Comment:
   Why do we need this in the public API? Why can't user define their own input 
partition class?



##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":

Review Comment:
   Why do we need this `_streamReader` in datasource API?



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


sahnib commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572618909


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+try:
+return self.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))

Review Comment:
   As we prefer `streamReader` over `simpleStreamReader`, can we call out in 
the docs that `streamReader` will be picked if the user has implemented both 
functions. 



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563370002


##
python/pyspark/sql/worker/plan_data_source_read.py:
##
@@ -51,6 +52,71 @@
 )
 
 
+def records_to_arrow_batches(
+output_iter: Iterator[Tuple],
+max_arrow_batch_size: int,
+return_type: StructType,
+data_source: DataSource,
+) -> Iterable[pa.RecordBatch]:
+def batched(iterator: Iterator, n: int) -> Iterator:
+return iter(functools.partial(lambda it: list(islice(it, n)), 
iterator), [])

Review Comment:
   This part of code was refactored out of the old plan_data_source_read file, 
I am not sure whether this is lazy or not. But for streaming we assume that 
simple python data source is light weight without partitioning and can 
materialize all records of a microbatch at once, so it doesn't matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563133283


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,192 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset track the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine call latestOffset(), the wrapper calls read() that 
start from
+current_offset, prefetch and cache the data, then update the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset = None
+self.current_offset = None

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563131511


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,192 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset track the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine call latestOffset(), the wrapper calls read() that 
start from
+current_offset, prefetch and cache the data, then update the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset = None
+self.current_offset = None

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563131182


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -164,7 +175,20 @@ class PythonStreamingSourceRunner(
   val pickledPartition: Array[Byte] = PythonWorkerUtils.readBytes(dataIn)
   pickledPartitions.append(pickledPartition)
 }
-pickledPartitions.toArray
+val prefetchedRecordsStatus = dataIn.readInt()
+val iter: Option[Iterator[InternalRow]] =
+  if (prefetchedRecordsStatus == NON_EMPTY_PYARROW_RECORD_BATCHES) {

Review Comment:
   Nice catch! Changed to pattern match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563130456


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -76,6 +97,19 @@ def commit_func(reader: DataSourceStreamReader, infile: IO, 
outfile: IO) -> None
 write_int(0, outfile)
 
 
+def send_batch_func(
+rows: Iterator[Tuple], outfile: IO, schema: StructType, data_source: 
DataSource
+) -> None:
+batches = list(records_to_arrow_batches(rows, 1000, schema, data_source))

Review Comment:
   Makes sense. Propagate the config from JVM now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563129813


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,192 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset track the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine call latestOffset(), the wrapper calls read() that 
start from
+current_offset, prefetch and cache the data, then update the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset = None
+self.current_offset = None

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563028452


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,192 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset track the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine call latestOffset(), the wrapper calls read() that 
start from
+current_offset, prefetch and cache the data, then update the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset = None
+self.current_offset = None

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1563028108


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -199,4 +223,30 @@ class PythonStreamingSourceRunner(
 logError("Exception when trying to kill worker", e)
 }
   }
+
+  private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+s"stream reader for $pythonExec", 0, Long.MaxValue)
+
+  def readArrowRecordBatches(): Iterator[InternalRow] = {
+assert(dataIn.readInt() == SpecialLengths.START_ARROW_STREAM)
+val reader = new ArrowStreamReader(dataIn, allocator)
+val root = reader.getVectorSchemaRoot()
+// When input is empty schema can't be read.
+val schema = ArrowUtils.fromArrowSchema(root.getSchema())
+assert(schema == outputSchema)
+
+val vectors = root.getFieldVectors().asScala.map { vector =>
+  new ArrowColumnVector(vector)
+}.toArray[ColumnVector]
+val rows = ArrayBuffer[InternalRow]()

Review Comment:
   We can't do lazy initialization here because we need to send the data from 
python process to JVM, the communication is synchronous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-12 Thread via GitHub


sahnib commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1562866326


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,192 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset track the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine call latestOffset(), the wrapper calls read() that 
start from
+current_offset, prefetch and cache the data, then update the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset = None
+self.current_offset = None
+

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-11 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1561875139


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   I have less context so let's make sure this is signed off by @HeartSaVioR 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-11 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1560686504


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   cc @HeartSaVioR 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-10 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1560425964


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   Changed to readBetweenOffsets()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-10 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1560294470


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   can we have a different name in that case instead of `read2`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-10 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1560293229


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+:class:`SimpleDataSourceStreamReader` doesn't require planning data 
partitioning. Also, the read api of

Review Comment:
   Let's do this for all class references



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-10 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1560262338


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   They are fundamentally different, the former read() is to read data and plan 
end offset, the latter is to read data between already planned start and end 
offset. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-10 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1560219006


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   can we have one method? you can make `end` argument optional



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-10 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1560215436


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of

Review Comment:
   I would use directives to reference classes here
   
   ```suggestion
   :class:`SimpleDataSourceStreamReader` doesn't require planning data 
partitioning. Also, the read api of
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-10 Thread via GitHub


chaoqin-li1123 commented on PR #45977:
URL: https://github.com/apache/spark/pull/45977#issuecomment-2048549899

   @allisonwang-db @HyukjinKwon @HeartSaVioR PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org