HyukjinKwon commented on code in PR #39614:
URL: https://github.com/apache/spark/pull/39614#discussion_r1091297030
##########
python/pyspark/sql/connect/plan.py:
##########
@@ -1413,6 +1413,95 @@ def _repr_html_(self) -> str:
pass
+class WriteOperationV2(LogicalPlan):
+ def __init__(self, child: "LogicalPlan", table_name: str) -> None:
+ super(WriteOperationV2, self).__init__(child)
+ self.table_name: Optional[str] = table_name
+ self.provider: Optional[str] = None
+ self.partitioning_columns: List["ColumnOrName"] = []
+ self.options: dict[str, Optional[str]] = {}
+ self.table_properties: dict[str, Optional[str]] = {}
+ self.mode: Optional[str] = None
+ self.overwrite_condition: Optional["ColumnOrName"] = None
+
+ def col_to_expr(self, col: "ColumnOrName", session: "SparkConnectClient")
-> proto.Expression:
+ if isinstance(col, Column):
+ return col.to_plan(session)
+ else:
+ return self.unresolved_attr(col)
+
+ def command(self, session: "SparkConnectClient") -> proto.Command:
+ assert self._child is not None
+ plan = proto.Command()
+ plan.write_operation_v2.input.CopyFrom(self._child.plan(session))
+ if self.table_name is not None:
+ plan.write_operation_v2.table_name = self.table_name
+ if self.provider is not None:
+ plan.write_operation_v2.provider = self.provider
+
+ plan.write_operation_v2.partitioning_columns.extend(
+ [self.col_to_expr(x, session) for x in self.partitioning_columns]
+ )
+
+ for k in self.options:
+ if self.options[k] is None:
+ plan.write_operation_v2.options.pop(k, None)
+ else:
+ plan.write_operation_v2.options[k] = cast(str, self.options[k])
+
+ for k in self.table_properties:
+ if self.table_properties[k] is None:
+ plan.write_operation_v2.table_properties.pop(k, None)
+ else:
+ plan.write_operation_v2.table_properties[k] = cast(str,
self.table_properties[k])
+
+ if self.mode is not None:
+ wm = self.mode.lower()
+ if wm == "create":
+ plan.write_operation_v2.mode =
proto.WriteOperationV2.Mode.MODE_CREATE
+ elif wm == "overwrite":
+ plan.write_operation_v2.mode =
proto.WriteOperationV2.Mode.MODE_OVERWRITE
+ elif wm == "overwrite_partition":
+ plan.write_operation_v2.mode =
proto.WriteOperationV2.Mode.MODE_OVERWRITE_PARTITIONS
+ elif wm == "append":
+ plan.write_operation_v2.mode =
proto.WriteOperationV2.Mode.MODE_APPEND
+ elif wm == "replace":
+ plan.write_operation_v2.mode =
proto.WriteOperationV2.Mode.MODE_REPLACE
+ if self.overwrite_condition is not None:
+ plan.write_operation_v2.overwrite_condition.CopyFrom(
+ self.col_to_expr(self.overwrite_condition, session)
+ )
+ elif wm == "create_or_replace":
+ plan.write_operation_v2.mode =
proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE
+ else:
+ raise ValueError(f"Unknown Mode value for DataFrame:
{self.mode}")
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<WriteOperationV2 table_name='{self.table_name}' "
+ f"provider='{self.provider}' "
+ f"partitioning_columns='{self.partitioning_columns}' "
+ f"options='{self.options}' "
+ f"table_properties='{self.table_properties}' "
+ f"mode='{self.mode}'>"
+ )
+
+ def _repr_html_(self) -> str:
Review Comment:
I actually think you can remove `print` and `_repr_html_` implementation
(and let the default implementation handle it)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]