allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364325976


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema 
when
+        users do not explicitly specify it. This method is invoked once when 
calling
+        `spark.read.format(...).load()` to get the schema for a data source 
read operation.
+        If this method is not implemented, and a user does not provide a 
schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the 
schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called 
once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible 
for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the 
Spark planner
+        to generate a list of partitions. Note, partition values must be 
serializable.
+
+        The planner then creates an RDD for each partition. Each partition 
value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark 
will create one

Review Comment:
   The default partition value should be `None` if the method is not 
implemented (indicating 1 partition).  I am opting to make `def partitions()` 
**optional** to implement from a usability perspective, making it easier for 
new users to get started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to