dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1358666425


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema 
when
+        users do not explicitly specify it. This method is invoked once when 
calling
+        `spark.read.format(...).load()` to get the schema for a data source 
read operation.
+        If this method is not implemented, and a user does not provide a 
schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the 
schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called 
once during

Review Comment:
   If we want to create a user-defined Python materialization in the future 
that has no corresponding reader implementation, this API would preclude this. 
Should we instead change the invariant to "at least one of 'reader' and/or 
'writer' must be implemented"?



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:

Review Comment:
   ```suggestion
   class DataSource:
       """
       This represents a user-defined table implemented in Python.
       Subsequent Spark programs can then query from and/or write to the table.
       This class defines the table's schema and declares an interface to define
       logic to generate the rows for a scan from the table, and to respond to
       writes of rows into the table later.
       
       """
   ```



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema 
when
+        users do not explicitly specify it. This method is invoked once when 
calling
+        `spark.read.format(...).load()` to get the schema for a data source 
read operation.
+        If this method is not implemented, and a user does not provide a 
schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the 
schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called 
once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible 
for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the 
Spark planner
+        to generate a list of partitions. Note, partition values must be 
serializable.

Review Comment:
   should we mention which serialization protocol must be supported? Is it the 
default Python serialization?



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema 
when
+        users do not explicitly specify it. This method is invoked once when 
calling
+        `spark.read.format(...).load()` to get the schema for a data source 
read operation.
+        If this method is not implemented, and a user does not provide a 
schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the 
schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called 
once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible 
for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the 
Spark planner
+        to generate a list of partitions. Note, partition values must be 
serializable.
+
+        The planner then creates an RDD for each partition. Each partition 
value will be

Review Comment:
   optional: we're trying to move away from RDDs for future use of Spark, maybe 
just mention that if this method returns N partitions, then the query execution 
will create N instances of this class, each initialized with one of the 
partition buffers created earlier?



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema 
when
+        users do not explicitly specify it. This method is invoked once when 
calling
+        `spark.read.format(...).load()` to get the schema for a data source 
read operation.
+        If this method is not implemented, and a user does not provide a 
schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the 
schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called 
once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible 
for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the 
Spark planner
+        to generate a list of partitions. Note, partition values must be 
serializable.
+
+        The planner then creates an RDD for each partition. Each partition 
value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark 
will create one

Review Comment:
   I would recommend to make this method required. User-defined data sources 
without partitioning support for Spark don't make much sense, and forcing users 
to implement this method acts as a safeguard that performance will remain high 
because the data source evaluation can take place using multiple executors 
concurrently. If users just want to return a single stream of values, they can 
create a UDTF that accepts scalar arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to