zhengruifeng commented on code in PR #38793:
URL: https://github.com/apache/spark/pull/38793#discussion_r1031959388
##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -477,6 +477,63 @@ def _show_string(
assert pdf is not None
return pdf["show_string"][0]
+ def withColumns(self, colsMap: Dict[str, Expression]) -> "DataFrame":
+ """
+ Returns a new :class:`DataFrame` by adding multiple columns or
replacing the
+ existing columns that has the same names.
+
+ The colsMap is a map of column name and column, the column must only
refer to attributes
+ supplied by this Dataset. It is an error to add columns that refer to
some other Dataset.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ colsMap : dict
+ a dict of column name and :class:`Column`.
+
+ Returns
+ -------
+ :class:`DataFrame`
+ DataFrame with new or replaced columns.
+ """
+ if not isinstance(colsMap, dict):
+ raise TypeError("colsMap must be dict of column name and column.")
+
+ return DataFrame.withPlan(
+ plan.WithColumns(self._plan, colsMap),
+ session=self._session,
+ )
+
+ def withColumn(self, colName: str, col: Expression) -> "DataFrame":
+ """
+ Returns a new :class:`DataFrame` by adding a column or replacing the
+ existing column that has the same name.
+
+ The column expression must be an expression over this
:class:`DataFrame`; attempting to add
+ a column from some other :class:`DataFrame` will raise an error.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ colName : str
+ string, name of the new column.
+ col : :class:`Column`
+ a :class:`Column` expression for the new column.
+
+ Returns
+ -------
+ :class:`DataFrame`
+ DataFrame with new or replaced column.
+ """
+ if not isinstance(col, Expression):
+ raise TypeError("col should be Column")
Review Comment:
```suggestion
raise TypeError("col should be Expression")
```
##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -242,6 +243,18 @@ class SparkConnectPlanner(session: SparkSession) {
.logicalPlan
}
+ private def transformWithColumns(rel: proto.WithColumns): LogicalPlan = {
+ val m = rel.getColsMap.asScala.toMap
+ val names = m.map { case (k, _) =>
+ k
+ }.toSeq
+ val cols = m.map { case (_, c) =>
+ Column(transformExpression(c))
+ }.toSeq
+
Review Comment:
```suggestion
val (names, cols) = m.mapValues(c =>
Column(transformExpression(c))).toSeq.unzip
```
##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -357,6 +358,29 @@ def test_fill_na(self):
self.spark.sql(query).na.fill({"a": True, "b": 2}).toPandas(),
)
+ def test_with_columns(self):
+ # SPARK-41256: tes withColumn(s).
Review Comment:
```suggestion
# SPARK-41256: test withColumn(s).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]