sahnib commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1562866326
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
+ initialOffset() when query start for the first time or initialized to be
the end offset of
+ the last committed batch when query restarts.
+
+ When streaming engine call latestOffset(), the wrapper calls read() that
start from
+ current_offset, prefetch and cache the data, then update the
current_offset to be
+ the end offset of the new data.
+
+ When streaming engine call planInputPartitions(start, end), the wrapper
get the prefetched data
+ from cache and send it to JVM along with the input partitions.
+
+ When query restart, batches in write ahead offset log that has not been
committed will be
+ replayed by reading data between start and end offset through read2(start,
end).
+ """
+
+ def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+ self.simple_reader = simple_reader
+ self.initial_offset = None
+ self.current_offset = None
+ self.cache: List[Tuple[dict, dict, Iterator[Tuple]]] = []
+
+ def initialOffset(self) -> dict:
+ if self.initial_offset is None:
+ self.initial_offset = self.simple_reader.initialOffset() # type:
ignore[assignment]
+ return self.initial_offset
+
+ def latestOffset(self) -> dict:
+ # when query start for the first time, use initial offset as the start
offset.
+ if self.current_offset is None:
+ self.current_offset = self.initialOffset() # type:
ignore[assignment]
+ (iter, end) = self.simple_reader.read(self.current_offset) # type:
ignore[arg-type]
+ self.cache.append((self.current_offset, end, iter)) # type:
ignore[arg-type]
+ self.current_offset = end # type: ignore[assignment]
+ return end
+
+ def commit(self, end: dict) -> None:
+ if self.current_offset is None:
+ self.current_offset = end # type: ignore[assignment]
+
+ end_idx = -1
+ for i in range(len(self.cache)):
+ if json.dumps(self.cache[i][1]) == json.dumps(end):
+ end_idx = i
+ break
+ if end_idx > 0:
+ # Drop prefetched data for batch that has been committed.
+ self.cache = self.cache[end_idx:]
+ self.simple_reader.commit(end)
+
+ def partitions(self, start: dict, end: dict) -> Sequence["InputPartition"]:
+ # when query restart from checkpoint, use the last committed offset as
the start offset.
+ # This depends on the current behavior that streaming engine call
getBatch on the last
+ # microbatch when query restart.
+ if self.current_offset is None:
+ self.current_offset = end # type: ignore[assignment]
+ if len(self.cache) > 0:
+ assert self.cache[-1][1] == end
+ return [SimpleInputPartition(start, end)]
+
+ def getCache(self, start: dict, end: dict) -> Iterator[Tuple]:
+ start_idx = -1
+ end_idx = -1
+ for i in range(len(self.cache)):
Review Comment:
[nit] can use `enumerate` on list here to simplify this a bit.
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
+ initialOffset() when query start for the first time or initialized to be
the end offset of
+ the last committed batch when query restarts.
+
+ When streaming engine call latestOffset(), the wrapper calls read() that
start from
+ current_offset, prefetch and cache the data, then update the
current_offset to be
+ the end offset of the new data.
+
+ When streaming engine call planInputPartitions(start, end), the wrapper
get the prefetched data
+ from cache and send it to JVM along with the input partitions.
+
+ When query restart, batches in write ahead offset log that has not been
committed will be
+ replayed by reading data between start and end offset through read2(start,
end).
+ """
+
+ def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+ self.simple_reader = simple_reader
+ self.initial_offset = None
+ self.current_offset = None
+ self.cache: List[Tuple[dict, dict, Iterator[Tuple]]] = []
+
+ def initialOffset(self) -> dict:
+ if self.initial_offset is None:
+ self.initial_offset = self.simple_reader.initialOffset() # type:
ignore[assignment]
+ return self.initial_offset
+
+ def latestOffset(self) -> dict:
+ # when query start for the first time, use initial offset as the start
offset.
+ if self.current_offset is None:
+ self.current_offset = self.initialOffset() # type:
ignore[assignment]
+ (iter, end) = self.simple_reader.read(self.current_offset) # type:
ignore[arg-type]
+ self.cache.append((self.current_offset, end, iter)) # type:
ignore[arg-type]
+ self.current_offset = end # type: ignore[assignment]
+ return end
+
+ def commit(self, end: dict) -> None:
+ if self.current_offset is None:
+ self.current_offset = end # type: ignore[assignment]
+
+ end_idx = -1
+ for i in range(len(self.cache)):
+ if json.dumps(self.cache[i][1]) == json.dumps(end):
Review Comment:
For dict, using the equality (==) operator should checks whether two
dictionaries have the same key-value pairs. Do we need `json.dumps` here?
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
+ initialOffset() when query start for the first time or initialized to be
the end offset of
+ the last committed batch when query restarts.
+
+ When streaming engine call latestOffset(), the wrapper calls read() that
start from
+ current_offset, prefetch and cache the data, then update the
current_offset to be
+ the end offset of the new data.
+
+ When streaming engine call planInputPartitions(start, end), the wrapper
get the prefetched data
+ from cache and send it to JVM along with the input partitions.
+
+ When query restart, batches in write ahead offset log that has not been
committed will be
+ replayed by reading data between start and end offset through read2(start,
end).
+ """
+
+ def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+ self.simple_reader = simple_reader
+ self.initial_offset = None
+ self.current_offset = None
+ self.cache: List[Tuple[dict, dict, Iterator[Tuple]]] = []
+
+ def initialOffset(self) -> dict:
+ if self.initial_offset is None:
+ self.initial_offset = self.simple_reader.initialOffset() # type:
ignore[assignment]
+ return self.initial_offset
+
+ def latestOffset(self) -> dict:
+ # when query start for the first time, use initial offset as the start
offset.
+ if self.current_offset is None:
+ self.current_offset = self.initialOffset() # type:
ignore[assignment]
+ (iter, end) = self.simple_reader.read(self.current_offset) # type:
ignore[arg-type]
+ self.cache.append((self.current_offset, end, iter)) # type:
ignore[arg-type]
+ self.current_offset = end # type: ignore[assignment]
+ return end
+
+ def commit(self, end: dict) -> None:
+ if self.current_offset is None:
+ self.current_offset = end # type: ignore[assignment]
+
+ end_idx = -1
+ for i in range(len(self.cache)):
+ if json.dumps(self.cache[i][1]) == json.dumps(end):
+ end_idx = i
+ break
+ if end_idx > 0:
+ # Drop prefetched data for batch that has been committed.
+ self.cache = self.cache[end_idx:]
+ self.simple_reader.commit(end)
+
+ def partitions(self, start: dict, end: dict) -> Sequence["InputPartition"]:
+ # when query restart from checkpoint, use the last committed offset as
the start offset.
+ # This depends on the current behavior that streaming engine call
getBatch on the last
+ # microbatch when query restart.
+ if self.current_offset is None:
+ self.current_offset = end # type: ignore[assignment]
+ if len(self.cache) > 0:
+ assert self.cache[-1][1] == end
+ return [SimpleInputPartition(start, end)]
+
+ def getCache(self, start: dict, end: dict) -> Iterator[Tuple]:
+ start_idx = -1
+ end_idx = -1
+ for i in range(len(self.cache)):
+ # There is no convenient way to compare 2 offsets.
+ # Serialize into json string before comparison.
+ if json.dumps(self.cache[i][0]) == json.dumps(start):
+ start_idx = i
+ if json.dumps(self.cache[i][1]) == json.dumps(end):
+ end_idx = i
Review Comment:
We should be able to break here, right?
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
+ initialOffset() when query start for the first time or initialized to be
the end offset of
+ the last committed batch when query restarts.
+
+ When streaming engine call latestOffset(), the wrapper calls read() that
start from
Review Comment:
[nit] `When streaming engine calls latestOffset(), the wrapper calls read()
that starts from current_offset, prefetches and caches the data, and finally
updates the current_offset to the end offset of the new data.`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala:
##########
@@ -35,6 +38,14 @@ class PythonMicroBatchStream(
ds.source.createPythonFunction(
ds.getOrCreateDataSourceInPython(shortName, options,
Some(outputSchema)).dataSource)
+ private val streamId = nextStreamId
+ private var nextBlockId = 0L
+
+ // planInputPartitions() maybe be called multiple times for the current
microbatch.
+ // Cache the result of planInputPartitions() because it may involves sending
data
Review Comment:
[nit] involves -> involve.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##########
@@ -199,4 +223,30 @@ class PythonStreamingSourceRunner(
logError("Exception when trying to kill worker", e)
}
}
+
+ private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+ s"stream reader for $pythonExec", 0, Long.MaxValue)
+
+ def readArrowRecordBatches(): Iterator[InternalRow] = {
+ assert(dataIn.readInt() == SpecialLengths.START_ARROW_STREAM)
+ val reader = new ArrowStreamReader(dataIn, allocator)
+ val root = reader.getVectorSchemaRoot()
+ // When input is empty schema can't be read.
+ val schema = ArrowUtils.fromArrowSchema(root.getSchema())
+ assert(schema == outputSchema)
+
+ val vectors = root.getFieldVectors().asScala.map { vector =>
+ new ArrowColumnVector(vector)
+ }.toArray[ColumnVector]
+ val rows = ArrayBuffer[InternalRow]()
Review Comment:
We are going to buffer all the rows in memory here? Can we create this
iterator lazily to avoid buffering all data from Python source?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##########
@@ -164,7 +175,20 @@ class PythonStreamingSourceRunner(
val pickledPartition: Array[Byte] = PythonWorkerUtils.readBytes(dataIn)
pickledPartitions.append(pickledPartition)
}
- pickledPartitions.toArray
+ val prefetchedRecordsStatus = dataIn.readInt()
+ val iter: Option[Iterator[InternalRow]] =
+ if (prefetchedRecordsStatus == NON_EMPTY_PYARROW_RECORD_BATCHES) {
Review Comment:
[nit] a case match might be easier to read.
##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -76,6 +97,19 @@ def commit_func(reader: DataSourceStreamReader, infile: IO,
outfile: IO) -> None
write_int(0, outfile)
+def send_batch_func(
+ rows: Iterator[Tuple], outfile: IO, schema: StructType, data_source:
DataSource
+) -> None:
+ batches = list(records_to_arrow_batches(rows, 1000, schema, data_source))
Review Comment:
Should the max_records_arrow_batch be set from a controllable Spark Config,
rather than hardcoded value of 1000?
##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -51,6 +52,71 @@
)
+def records_to_arrow_batches(
+ output_iter: Iterator[Tuple],
+ max_arrow_batch_size: int,
+ return_type: StructType,
+ data_source: DataSource,
+) -> Iterable[pa.RecordBatch]:
+ def batched(iterator: Iterator, n: int) -> Iterator:
+ return iter(functools.partial(lambda it: list(islice(it, n)),
iterator), [])
Review Comment:
Would the batches be constructed immediately here? Instead, can we do lazy
generation using generators?
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
+ initialOffset() when query start for the first time or initialized to be
the end offset of
+ the last committed batch when query restarts.
+
+ When streaming engine call latestOffset(), the wrapper calls read() that
start from
+ current_offset, prefetch and cache the data, then update the
current_offset to be
+ the end offset of the new data.
+
+ When streaming engine call planInputPartitions(start, end), the wrapper
get the prefetched data
+ from cache and send it to JVM along with the input partitions.
+
+ When query restart, batches in write ahead offset log that has not been
committed will be
+ replayed by reading data between start and end offset through read2(start,
end).
+ """
+
+ def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+ self.simple_reader = simple_reader
+ self.initial_offset = None
+ self.current_offset = None
+ self.cache: List[Tuple[dict, dict, Iterator[Tuple]]] = []
+
+ def initialOffset(self) -> dict:
+ if self.initial_offset is None:
+ self.initial_offset = self.simple_reader.initialOffset() # type:
ignore[assignment]
+ return self.initial_offset
+
+ def latestOffset(self) -> dict:
+ # when query start for the first time, use initial offset as the start
offset.
+ if self.current_offset is None:
+ self.current_offset = self.initialOffset() # type:
ignore[assignment]
+ (iter, end) = self.simple_reader.read(self.current_offset) # type:
ignore[arg-type]
+ self.cache.append((self.current_offset, end, iter)) # type:
ignore[arg-type]
+ self.current_offset = end # type: ignore[assignment]
Review Comment:
[nit] Can we declare the types of these variables inside the `__init__`
function to prevent type ignores here?
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
Review Comment:
[nit] `current_offset tracks the latest progress`
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
+ initialOffset() when query start for the first time or initialized to be
the end offset of
+ the last committed batch when query restarts.
+
+ When streaming engine call latestOffset(), the wrapper calls read() that
start from
+ current_offset, prefetch and cache the data, then update the
current_offset to be
+ the end offset of the new data.
+
+ When streaming engine call planInputPartitions(start, end), the wrapper
get the prefetched data
+ from cache and send it to JVM along with the input partitions.
+
+ When query restart, batches in write ahead offset log that has not been
committed will be
+ replayed by reading data between start and end offset through read2(start,
end).
+ """
+
+ def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+ self.simple_reader = simple_reader
+ self.initial_offset = None
+ self.current_offset = None
+ self.cache: List[Tuple[dict, dict, Iterator[Tuple]]] = []
+
+ def initialOffset(self) -> dict:
+ if self.initial_offset is None:
+ self.initial_offset = self.simple_reader.initialOffset() # type:
ignore[assignment]
+ return self.initial_offset
+
+ def latestOffset(self) -> dict:
+ # when query start for the first time, use initial offset as the start
offset.
+ if self.current_offset is None:
+ self.current_offset = self.initialOffset() # type:
ignore[assignment]
+ (iter, end) = self.simple_reader.read(self.current_offset) # type:
ignore[arg-type]
+ self.cache.append((self.current_offset, end, iter)) # type:
ignore[arg-type]
+ self.current_offset = end # type: ignore[assignment]
+ return end
+
+ def commit(self, end: dict) -> None:
+ if self.current_offset is None:
+ self.current_offset = end # type: ignore[assignment]
+
+ end_idx = -1
+ for i in range(len(self.cache)):
Review Comment:
Might be more readable to use
```
for idx, cache_val in enumerate(self.cache):
// if cache_val endIndex is equal, note idx and break.
...
```
##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,192 @@ def stop(self) -> None:
...
+class SimpleInputPartition(InputPartition):
+ def __init__(self, start: dict, end: dict):
+ self.start = start
+ self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+ """
+ A base class for simplified streaming data source readers.
+ Compared to :class:`DataSourceStreamReader`,
:class:`SimpleDataSourceStreamReader` doesn't
+ require planning data partition. Also, the read api of
:class:`SimpleDataSourceStreamReader`
+ allows reading data and planning the latest offset at the same time.
+
+ .. versionadded: 4.0.0
+ """
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial offset of the streaming data source.
+ A new streaming query starts reading data from the initial offset.
+ If Spark is restarting an existing query, it will restart from the
check-pointed offset
+ rather than the initial one.
+
+ Returns
+ -------
+ dict
+ A dict or recursive dict whose key and value are primitive types,
which includes
+ Integer, String and Boolean.
+
+ Examples
+ --------
+ >>> def initialOffset(self):
+ ... return {"parititon-1": {"index": 3, "closed": True},
"partition-2": {"index": 5}}
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "initialOffset"},
+ )
+
+ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+ """
+ Read all available data from start offset and return the offset that
next read attempt
+ starts from.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ Returns
+ -------
+ A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+ The iterator contains all the available records after start offset.
+ The dict is the end offset of this read attempt and the start of
next read attempt.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read"},
+ )
+
+ def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+ """
+ Read all available data from specific start offset and end offset.
+ This is invoked during failure recovery to re-read a batch
deterministically
+ in order to achieve exactly once.
+
+ Parameters
+ ----------
+ start : dict
+ The start offset to start reading from.
+
+ end : dict
+ The offset where the reading stop.
+
+ Returns
+ -------
+ iterator of :class:`Tuple`\\s
+ All the records between start offset and end offset.
+ """
+ raise PySparkNotImplementedError(
+ error_class="NOT_IMPLEMENTED",
+ message_parameters={"feature": "read2"},
+ )
+
+ def commit(self, end: dict) -> None:
+ """
+ Informs the source that Spark has completed processing all data for
offsets less than or
+ equal to `end` and will only request offsets greater than `end` in the
future.
+
+ Parameters
+ ----------
+ end : dict
+ The latest offset that the streaming query has processed for this
source.
+ """
+ ...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+ """
+ A private class that wrap :class:`SimpleDataSourceStreamReader` in
prefetch and cache pattern,
+ so that :class:`SimpleDataSourceStreamReader` can integrate with streaming
engine like an
+ ordinary :class:`DataSourceStreamReader`.
+
+ current_offset track the latest progress of the record prefetching, it is
initialized to be
+ initialOffset() when query start for the first time or initialized to be
the end offset of
+ the last committed batch when query restarts.
+
+ When streaming engine call latestOffset(), the wrapper calls read() that
start from
+ current_offset, prefetch and cache the data, then update the
current_offset to be
+ the end offset of the new data.
+
+ When streaming engine call planInputPartitions(start, end), the wrapper
get the prefetched data
+ from cache and send it to JVM along with the input partitions.
+
+ When query restart, batches in write ahead offset log that has not been
committed will be
+ replayed by reading data between start and end offset through read2(start,
end).
+ """
+
+ def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+ self.simple_reader = simple_reader
+ self.initial_offset = None
+ self.current_offset = None
+ self.cache: List[Tuple[dict, dict, Iterator[Tuple]]] = []
Review Comment:
it might be more readable to create a object with 3 fields (start_offset,
end_offset and data_iterator) to enhance readability than using a 3 variable
Tuple.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]