allisonwang-db commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1586863445


##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.

Review Comment:
   Can we move these explanations into the docstring of the function?



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.
+
+**Implement the Simple Stream Reader**
+
+.. code-block:: python
+
+    class SimpleStreamReader(SimpleDataSourceStreamReader):
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def read(self, start: dict):
+            start_idx = start["offset"]
+            it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+            return (it, {"offset": start_idx + 2})
+
+        def readBetweenOffsets(self, start: dict, end: dict):
+            start_idx = start["offset"]
+            end_idx = end["offset"]
+            return iter([(i,) for i in range(start_idx, end_idx)])
+
+        def commit(self, end):
+            pass
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.initialOffset` 
should return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.read` takes start 
offset as an input, return an iterator of tuples and the start offset of next 
read.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.readBetweenOffsets` 
takes start and end offset as input and read an iterator of data 
deterministically. This is called whe query replay batches during restart or 
after failure.

Review Comment:
   shall we also mention whether these methods are required or optional to 
implement?



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
         def reader(self, schema: StructType):
             return FakeDataSourceReader(schema, self.options)
 
+        def streamReader(self, schema: StructType):
+            return FakeStreamReader(schema, self.options)
 
-**Step 2: Implement the Reader**
+        def simpleStreamReader(self, schema: StructType):
+            return SimpleStreamReader()
+
+        def streamWriter(self, schema: StructType, overwrite: bool):
+            return FakeStreamWriter(self.options)
+

Review Comment:
   Actually, shall we add a new section for streaming data sources instead of 
mixing it with the batch one? 



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):

Review Comment:
   can we add typing here?



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.

Review Comment:
   Nit: we can move this explanation below the header to first explain what 
this reader is doing.



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.
+
+**Implement the Simple Stream Reader**
+
+.. code-block:: python
+
+    class SimpleStreamReader(SimpleDataSourceStreamReader):
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def read(self, start: dict):
+            start_idx = start["offset"]
+            it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+            return (it, {"offset": start_idx + 2})
+
+        def readBetweenOffsets(self, start: dict, end: dict):
+            start_idx = start["offset"]
+            end_idx = end["offset"]
+            return iter([(i,) for i in range(start_idx, end_idx)])
+
+        def commit(self, end):
+            pass
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.

Review Comment:
   ditto



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -137,3 +271,21 @@ Use the fake datasource with a different number of rows:
     # |  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
     # | Douglas James|2007-01-18|  46226|     Alabama|
     # +--------------+----------+-------+------------+
+
+Start a streaming query with the fake data stream. Once we register the python 
data source, we can use it as source of readStream() or sink of writeStream() 
by passing short name or full name to format().
+
+.. code-block:: python
+
+    query = 
spark.readStream.format("fake").load().writeStream().format("fake").start()

Review Comment:
   shall we add an example for writeStream?



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.
+
+**Implement the Simple Stream Reader**
+
+.. code-block:: python
+
+    class SimpleStreamReader(SimpleDataSourceStreamReader):
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def read(self, start: dict):
+            start_idx = start["offset"]
+            it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+            return (it, {"offset": start_idx + 2})
+
+        def readBetweenOffsets(self, start: dict, end: dict):
+            start_idx = start["offset"]
+            end_idx = end["offset"]
+            return iter([(i,) for i in range(start_idx, end_idx)])
+
+        def commit(self, end):
+            pass
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.initialOffset` 
should return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.read` takes start 
offset as an input, return an iterator of tuples and the start offset of next 
read.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.readBetweenOffsets` 
takes start and end offset as input and read an iterator of data 
deterministically. This is called whe query replay batches during restart or 
after failure.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.commit` is invoked 
when the query has finished processing data before end offset, this can be used 
to clean up resource.
+
+**Implement the Stream Writer**
+
+.. code-block:: python
+
+    class SimpleCommitMessage(WriterCommitMessage):
+       partition_id: int
+       count: int
+
+
+    class FakeStreamWriter(DataSourceStreamWriter):
+       def __init__(self, options):
+           self.options = options
+           self.path = self.options.get("path")
+           assert self.path is not None
+
+
+       def write(self, iterator):
+           from pyspark import TaskContext
+           context = TaskContext.get()
+           partition_id = context.partitionId()
+           cnt = 0
+           for row in iterator:
+               cnt += 1
+           return SimpleCommitMessage(partition_id=partition_id, count=cnt)
+
+
+       def commit(self, messages, batchId) -> None:
+           status = dict(num_partitions=len(messages), rows=sum(m.count for m 
in messages))
+
+

Review Comment:
   nit: extra space



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -137,3 +271,21 @@ Use the fake datasource with a different number of rows:
     # |  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
     # | Douglas James|2007-01-18|  46226|     Alabama|
     # +--------------+----------+-------+------------+
+
+Start a streaming query with the fake data stream. Once we register the python 
data source, we can use it as source of readStream() or sink of writeStream() 
by passing short name or full name to format().
+
+.. code-block:: python
+
+    query = 
spark.readStream.format("fake").load().writeStream().format("fake").start()

Review Comment:
   Also, do we have any example outputs we can show to users?



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -137,3 +271,21 @@ Use the fake datasource with a different number of rows:
     # |  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
     # | Douglas James|2007-01-18|  46226|     Alabama|
     # +--------------+----------+-------+------------+
+
+Start a streaming query with the fake data stream. Once we register the python 
data source, we can use it as source of readStream() or sink of writeStream() 
by passing short name or full name to format().
+
+.. code-block:: python
+
+    query = 
spark.readStream.format("fake").load().writeStream().format("fake").start()
+
+**Serialization Requirement**
+
+User defined DataSource, DataSourceReader, DataSourceWriter, 
DataSourceStreamReader and DataSourceStreamWriter and their methods must be 
able to be serialized by pickle.
+
+For library that are used inside a method, it must be imported inside the 
method. For example, Faker must be imported inside the read() method in the 
code below.
+
+.. code-block:: python
+
+    def read(self, partition):
+        from faker import Faker
+        fake = Faker()

Review Comment:
   We can use TaskContext as an example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to