allisonwang-db commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1602361195
##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -33,9 +33,15 @@ To create a custom Python data source, you'll need to
subclass the :class:`DataS
This example demonstrates creating a simple data source to generate synthetic
data using the `faker` library. Ensure the `faker` library is installed and
accessible in your Python environment.
-**Step 1: Define the Data Source**
+**Define the Data Source**
-Start by creating a new subclass of :class:`DataSource`. Define the source
name, schema, and reader logic as follows:
+Start by creating a new subclass of :class:`DataSource` with the source name,
schema.
+
+In order to read from the data source in a batch query, reader() method need
to be defined.
+
+In order to read from the data source in a streaming query, streamReader() or
simpleStreamReader() method need to be defined.
+
+In order to write to the data source in a streaming query, streamWriter()
method need to be defined.
Review Comment:
Do you think it's more clear to have a markdown table here? streaming/batch
x read/write.
##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,9 +101,158 @@ Define the reader logic to generate synthetic data. Use
the `faker` library to p
row.append(value)
yield tuple(row)
+Implementing Streaming Reader and Writer for Python Data Source
+---------------------------------------------------------------
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every
microbatch. The streamReader instance has a integer offset that increase by 2
in every microbatch.
+
+.. code-block:: python
+
+ class RangePartition(InputPartition):
+ def __init__(self, start, end):
+ self.start = start
+ self.end = end
+
+ class FakeStreamReader(DataSourceStreamReader):
+ def __init__(self, schema, options):
+ self.current = 0
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial start offset of the reader.
+ """
+ return {"offset": 0}
+
+ def latestOffset(self) -> dict:
+ """
+ Return the current latest offset that the next microbatch will
read to.
+ """
+ self.current += 2
+ return {"offset": self.current}
+
+ def partitions(self, start: dict, end: dict):
+ """
+ Plans the partitioning of the current microbatch defined by start
and end offset,
+ it needs to return a sequence of :class:`InputPartition` object.
+ """
+ return [RangePartition(start["offset"], end["offset"])]
+
+ def commit(self, end: dict):
+ """
+ This is invoked when the query has finished processing data before
end offset, this can be used to clean up resource.
+ """
+ pass
+
+ def read(self, partition) -> Iterator[Tuple]:
+ """
+ Takes a partition as an input and read an iterator of tuples from
the data source.
+ """
+ start, end = partition.start, partition.end
+ for i in range(start, end):
+ yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for
readable streaming data source. And simpleStreamReader() will only be invoked
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch
implemented with SimpleDataSourceStreamReader interface.
+
+.. code-block:: python
+
+ class SimpleStreamReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ """
+ Return the initial start offset of the reader.
+ """
+ return {"offset": 0}
+
+ def read(self, start: dict) -> (Iterator[Tuple], dict):
+ """
+ Takes start offset as an input, return an iterator of tuples and
the start offset of next read.
+ """
+ start_idx = start["offset"]
+ it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+ return (it, {"offset": start_idx + 2})
+
+ def readBetweenOffsets(self, start: dict, end: dict) ->
Iterator[Tuple]:
+ """
+ Takes start and end offset as input and read an iterator of data
deterministically.
+ This is called whe query replay batches during restart or after
failure.
+ """
+ start_idx = start["offset"]
+ end_idx = end["offset"]
+ return iter([(i,) for i in range(start_idx, end_idx)])
+
+ def commit(self, end):
+ """
+ This is invoked when the query has finished processing data before
end offset, this can be used to clean up resource.
+ """
+ pass
+
+**Implement the Stream Writer**
+
+This is a streaming data writer that write the metadata information of each
microbatch to a local path.
+
+.. code-block:: python
+
+ class SimpleCommitMessage(WriterCommitMessage):
+ partition_id: int
+ count: int
+
+ class FakeStreamWriter(DataSourceStreamWriter):
+ def __init__(self, options):
+ self.options = options
+ self.path = self.options.get("path")
+ assert self.path is not None
+
+ def write(self, iterator):
+ """
+ Write the data and return the commit message of that partition
+ """
+ from pyspark import TaskContext
+ context = TaskContext.get()
+ partition_id = context.partitionId()
+ cnt = 0
+ for row in iterator:
+ cnt += 1
+ return SimpleCommitMessage(partition_id=partition_id, count=cnt)
+
Review Comment:
nit: extra line
##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,9 +101,158 @@ Define the reader logic to generate synthetic data. Use
the `faker` library to p
row.append(value)
yield tuple(row)
+Implementing Streaming Reader and Writer for Python Data Source
+---------------------------------------------------------------
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every
microbatch. The streamReader instance has a integer offset that increase by 2
in every microbatch.
+
+.. code-block:: python
+
+ class RangePartition(InputPartition):
+ def __init__(self, start, end):
+ self.start = start
+ self.end = end
+
+ class FakeStreamReader(DataSourceStreamReader):
+ def __init__(self, schema, options):
+ self.current = 0
+
+ def initialOffset(self) -> dict:
+ """
+ Return the initial start offset of the reader.
+ """
+ return {"offset": 0}
+
+ def latestOffset(self) -> dict:
+ """
+ Return the current latest offset that the next microbatch will
read to.
+ """
+ self.current += 2
+ return {"offset": self.current}
+
+ def partitions(self, start: dict, end: dict):
+ """
+ Plans the partitioning of the current microbatch defined by start
and end offset,
+ it needs to return a sequence of :class:`InputPartition` object.
+ """
+ return [RangePartition(start["offset"], end["offset"])]
+
+ def commit(self, end: dict):
+ """
+ This is invoked when the query has finished processing data before
end offset, this can be used to clean up resource.
+ """
+ pass
+
+ def read(self, partition) -> Iterator[Tuple]:
+ """
+ Takes a partition as an input and read an iterator of tuples from
the data source.
+ """
+ start, end = partition.start, partition.end
+ for i in range(start, end):
+ yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for
readable streaming data source. And simpleStreamReader() will only be invoked
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch
implemented with SimpleDataSourceStreamReader interface.
+
+.. code-block:: python
+
+ class SimpleStreamReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ """
+ Return the initial start offset of the reader.
+ """
+ return {"offset": 0}
+
+ def read(self, start: dict) -> (Iterator[Tuple], dict):
+ """
+ Takes start offset as an input, return an iterator of tuples and
the start offset of next read.
+ """
+ start_idx = start["offset"]
+ it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+ return (it, {"offset": start_idx + 2})
+
+ def readBetweenOffsets(self, start: dict, end: dict) ->
Iterator[Tuple]:
+ """
+ Takes start and end offset as input and read an iterator of data
deterministically.
+ This is called whe query replay batches during restart or after
failure.
+ """
+ start_idx = start["offset"]
+ end_idx = end["offset"]
+ return iter([(i,) for i in range(start_idx, end_idx)])
+
+ def commit(self, end):
+ """
+ This is invoked when the query has finished processing data before
end offset, this can be used to clean up resource.
+ """
+ pass
+
+**Implement the Stream Writer**
+
+This is a streaming data writer that write the metadata information of each
microbatch to a local path.
+
+.. code-block:: python
+
+ class SimpleCommitMessage(WriterCommitMessage):
+ partition_id: int
+ count: int
+
+ class FakeStreamWriter(DataSourceStreamWriter):
+ def __init__(self, options):
+ self.options = options
+ self.path = self.options.get("path")
+ assert self.path is not None
+
+ def write(self, iterator):
+ """
+ Write the data and return the commit message of that partition
+ """
+ from pyspark import TaskContext
+ context = TaskContext.get()
+ partition_id = context.partitionId()
+ cnt = 0
+ for row in iterator:
+ cnt += 1
+ return SimpleCommitMessage(partition_id=partition_id, count=cnt)
+
+
+ def commit(self, messages, batchId) -> None:
+ """
+ Receives a sequence of :class:`WriterCommitMessage` when all write
tasks succeed and decides what to do with it.
+ In this FakeStreamWriter, we write the metadata of the
microbatch(number of rows and partitions) into a json file inside commit().
+ """
+ status = dict(num_partitions=len(messages), rows=sum(m.count for m
in messages))
+ with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
+ file.write(json.dumps(status) + "\n")
+
+ def abort(self, messages, batchId) -> None:
+ """
+ Receives a sequence of :class:`WriterCommitMessage` from successful
tasks when some tasks fail and decides what to do with it.
+ In this FakeStreamWriter, we write a failure message into a txt
file inside abort().
+ """
+ with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
Review Comment:
can we use `.json` here to make it consistent with `commit`?
##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`.
Define the source name,
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
+ def streamReader(self, schema: StructType):
Review Comment:
+1 I wonder if we should split the streaming and batch implementation in the
user guide.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]