HeartSaVioR commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1588609079


##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.

Review Comment:
   +1



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.

Review Comment:
   +1



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
         def reader(self, schema: StructType):
             return FakeDataSourceReader(schema, self.options)
 
+        def streamReader(self, schema: StructType):
+            return FakeStreamReader(schema, self.options)
 
-**Step 2: Implement the Reader**
+        def simpleStreamReader(self, schema: StructType):

Review Comment:
   This could lead to confusion that they have to implement both, not either 
one. Could we please add comment or just duplicate the data source and describe 
separately?



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
         def reader(self, schema: StructType):
             return FakeDataSourceReader(schema, self.options)
 
+        def streamReader(self, schema: StructType):

Review Comment:
   Given they will see the example from the start, it could give a confusion 
that every data source has to implement both batch and streaming. Probably 
better to have some explanation.



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -137,3 +271,21 @@ Use the fake datasource with a different number of rows:
     # |  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
     # | Douglas James|2007-01-18|  46226|     Alabama|
     # +--------------+----------+-------+------------+
+
+Start a streaming query with the fake data stream. Once we register the python 
data source, we can use it as source of readStream() or sink of writeStream() 
by passing short name or full name to format().
+
+.. code-block:: python
+
+    query = 
spark.readStream.format("fake").load().writeStream().format("fake").start()

Review Comment:
   Yeah, the output from console sink would be OK. Maybe batch 0 or so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to