This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 45bb9578a0f [SPARK-40813][CONNECT][PYTHON][FOLLOW-UP] Improve limit and offset in Python client 45bb9578a0f is described below commit 45bb9578a0f6b40b472588a407d842f293e9e323 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Fri Oct 21 10:33:18 2022 +0900 [SPARK-40813][CONNECT][PYTHON][FOLLOW-UP] Improve limit and offset in Python client ### What changes were proposed in this pull request? Following up after https://github.com/apache/spark/pull/38275, improve limit and offset in Python client. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38314 from amaliujia/python_test_limit_offset. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/dataframe.py | 3 ++ python/pyspark/sql/connect/plan.py | 32 ++++++++++++++++++++-- .../sql/tests/connect/test_connect_basic.py | 7 +++++ .../sql/tests/connect/test_connect_plan_only.py | 10 +++++++ 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 1f7e789818f..5ca747fdd6a 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -199,6 +199,9 @@ class DataFrame(object): def limit(self, n: int) -> "DataFrame": return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n), session=self._session) + def offset(self, n: int) -> "DataFrame": + return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n), session=self._session) + def sort(self, *cols: "ColumnOrString") -> "DataFrame": """Sort by a specific column""" return DataFrame.withPlan(plan.Sort(self._plan, *cols), session=self._session) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index c564b71cdba..5b8b7c71866 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -272,10 +272,9 @@ class Filter(LogicalPlan): class Limit(LogicalPlan): - def __init__(self, child: Optional["LogicalPlan"], limit: int, offset: int = 0) -> None: + def __init__(self, child: Optional["LogicalPlan"], limit: int) -> None: super().__init__(child) self.limit = limit - self.offset = offset def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: assert self._child is not None @@ -286,7 +285,7 @@ class Limit(LogicalPlan): def print(self, indent: int = 0) -> str: c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else "" - return f"{' ' * indent}<Limit limit={self.limit} offset={self.offset}>\n{c_buf}" + return f"{' ' * indent}<Limit limit={self.limit}>\n{c_buf}" def _repr_html_(self) -> str: return f""" @@ -294,6 +293,33 @@ class Limit(LogicalPlan): <li> <b>Limit</b><br /> Limit: {self.limit} <br /> + {self._child_repr_()} + </li> + </uL> + """ + + +class Offset(LogicalPlan): + def __init__(self, child: Optional["LogicalPlan"], offset: int = 0) -> None: + super().__init__(child) + self.offset = offset + + def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: + assert self._child is not None + plan = proto.Relation() + plan.offset.input.CopyFrom(self._child.plan(session)) + plan.offset.offset = self.offset + return plan + + def print(self, indent: int = 0) -> str: + c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else "" + return f"{' ' * indent}<Offset={self.offset}>\n{c_buf}" + + def _repr_html_(self) -> str: + return f""" + <ul> + <li> + <b>Limit</b><br /> Offset: {self.offset} <br /> {self._child_repr_()} </li> diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index de300946932..f6988a1d120 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -106,6 +106,13 @@ class SparkConnectTests(SparkConnectSQLTestCase): res = pandas.DataFrame(data={"id": [0, 30, 60, 90]}) self.assert_(pd.equals(res), f"{pd.to_string()} != {res.to_string()}") + def test_limit_offset(self): + df = self.connect.read.table(self.tbl_name) + pd = df.limit(10).offset(1).toPandas() + self.assertEqual(9, len(pd.index)) + pd2 = df.offset(98).limit(10).toPandas() + self.assertEqual(2, len(pd2.index)) + def test_simple_datasource_read(self) -> None: writeDf = self.df_text tmpPath = tempfile.mkdtemp() diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py index 96bbb8aa834..739c24ca96e 100644 --- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py +++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py @@ -44,6 +44,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture): self.assertEqual(plan.root.filter.condition.unresolved_function.parts, [">"]) self.assertEqual(len(plan.root.filter.condition.unresolved_function.arguments), 2) + def test_limit(self): + df = self.connect.readTable(table_name=self.tbl_name) + limit_plan = df.limit(10)._plan.to_proto(self.connect) + self.assertEqual(limit_plan.root.limit.limit, 10) + + def test_offset(self): + df = self.connect.readTable(table_name=self.tbl_name) + offset_plan = df.offset(10)._plan.to_proto(self.connect) + self.assertEqual(offset_plan.root.offset.offset, 10) + def test_relation_alias(self): df = self.connect.readTable(table_name=self.tbl_name) plan = df.alias("table_alias")._plan.to_proto(self.connect) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org