This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 45bb9578a0f [SPARK-40813][CONNECT][PYTHON][FOLLOW-UP] Improve limit 
and offset in Python client
45bb9578a0f is described below

commit 45bb9578a0f6b40b472588a407d842f293e9e323
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Fri Oct 21 10:33:18 2022 +0900

    [SPARK-40813][CONNECT][PYTHON][FOLLOW-UP] Improve limit and offset in 
Python client
    
    ### What changes were proposed in this pull request?
    
    Following up after https://github.com/apache/spark/pull/38275, improve 
limit and offset in Python client.
    
    ### Why are the changes needed?
    
    Improve API coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    Closes #38314 from amaliujia/python_test_limit_offset.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/dataframe.py            |  3 ++
 python/pyspark/sql/connect/plan.py                 | 32 ++++++++++++++++++++--
 .../sql/tests/connect/test_connect_basic.py        |  7 +++++
 .../sql/tests/connect/test_connect_plan_only.py    | 10 +++++++
 4 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 1f7e789818f..5ca747fdd6a 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -199,6 +199,9 @@ class DataFrame(object):
     def limit(self, n: int) -> "DataFrame":
         return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n), 
session=self._session)
 
+    def offset(self, n: int) -> "DataFrame":
+        return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n), 
session=self._session)
+
     def sort(self, *cols: "ColumnOrString") -> "DataFrame":
         """Sort by a specific column"""
         return DataFrame.withPlan(plan.Sort(self._plan, *cols), 
session=self._session)
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index c564b71cdba..5b8b7c71866 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -272,10 +272,9 @@ class Filter(LogicalPlan):
 
 
 class Limit(LogicalPlan):
-    def __init__(self, child: Optional["LogicalPlan"], limit: int, offset: int 
= 0) -> None:
+    def __init__(self, child: Optional["LogicalPlan"], limit: int) -> None:
         super().__init__(child)
         self.limit = limit
-        self.offset = offset
 
     def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
         assert self._child is not None
@@ -286,7 +285,7 @@ class Limit(LogicalPlan):
 
     def print(self, indent: int = 0) -> str:
         c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
-        return f"{' ' * indent}<Limit limit={self.limit} 
offset={self.offset}>\n{c_buf}"
+        return f"{' ' * indent}<Limit limit={self.limit}>\n{c_buf}"
 
     def _repr_html_(self) -> str:
         return f"""
@@ -294,6 +293,33 @@ class Limit(LogicalPlan):
             <li>
                 <b>Limit</b><br />
                 Limit: {self.limit} <br />
+                {self._child_repr_()}
+            </li>
+        </uL>
+        """
+
+
+class Offset(LogicalPlan):
+    def __init__(self, child: Optional["LogicalPlan"], offset: int = 0) -> 
None:
+        super().__init__(child)
+        self.offset = offset
+
+    def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
+        assert self._child is not None
+        plan = proto.Relation()
+        plan.offset.input.CopyFrom(self._child.plan(session))
+        plan.offset.offset = self.offset
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
+        return f"{' ' * indent}<Offset={self.offset}>\n{c_buf}"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+            <li>
+                <b>Limit</b><br />
                 Offset: {self.offset} <br />
                 {self._child_repr_()}
             </li>
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index de300946932..f6988a1d120 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -106,6 +106,13 @@ class SparkConnectTests(SparkConnectSQLTestCase):
         res = pandas.DataFrame(data={"id": [0, 30, 60, 90]})
         self.assert_(pd.equals(res), f"{pd.to_string()} != {res.to_string()}")
 
+    def test_limit_offset(self):
+        df = self.connect.read.table(self.tbl_name)
+        pd = df.limit(10).offset(1).toPandas()
+        self.assertEqual(9, len(pd.index))
+        pd2 = df.offset(98).limit(10).toPandas()
+        self.assertEqual(2, len(pd2.index))
+
     def test_simple_datasource_read(self) -> None:
         writeDf = self.df_text
         tmpPath = tempfile.mkdtemp()
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py 
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index 96bbb8aa834..739c24ca96e 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -44,6 +44,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
         self.assertEqual(plan.root.filter.condition.unresolved_function.parts, 
[">"])
         
self.assertEqual(len(plan.root.filter.condition.unresolved_function.arguments), 
2)
 
+    def test_limit(self):
+        df = self.connect.readTable(table_name=self.tbl_name)
+        limit_plan = df.limit(10)._plan.to_proto(self.connect)
+        self.assertEqual(limit_plan.root.limit.limit, 10)
+
+    def test_offset(self):
+        df = self.connect.readTable(table_name=self.tbl_name)
+        offset_plan = df.offset(10)._plan.to_proto(self.connect)
+        self.assertEqual(offset_plan.root.offset.offset, 10)
+
     def test_relation_alias(self):
         df = self.connect.readTable(table_name=self.tbl_name)
         plan = df.alias("table_alias")._plan.to_proto(self.connect)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to