rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184091839
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
string path = 11;
string table_name = 12;
}
+
+ optional Foreach foreach = 13;
+}
+
+message Foreach {
+ // (Required) The encoded commands of the Python foreach function
+ bytes command = 1;
Review Comment:
Can we use `PythonUDF` message? Even though 'output_type' is not needed.
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
string path = 11;
string table_name = 12;
}
+
+ optional Foreach foreach = 13;
+}
+
+message Foreach {
Review Comment:
`Foreach` seems too short and does not convey much context. How about
`StreamingForeachSink` or `StreamingForeachWriter`?
##########
python/pyspark/sql/connect/streaming/readwriter.py:
##########
@@ -339,7 +342,9 @@ def table(self, tableName: str) -> "DataFrame":
class DataStreamWriter:
- def __init__(self, plan: "LogicalPlan", session: "SparkSession") -> None:
+ def __init__(
+ self, plan: "LogicalPlan", session: "SparkSession"
+ ) -> None:
Review Comment:
Not needed?
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -455,6 +463,16 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
this
}
+ def foreachPython(writer: PythonForeachWriter): DataStreamWriter[T] = {
Review Comment:
should be private?
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1385,6 +1385,21 @@ class SparkConnectPlanner(val session: SparkSession) {
accumulator = null)
}
+ private def transformPythonForeachFunction(fun: proto.Foreach):
SimplePythonFunction = {
Review Comment:
Could use `PythonUDF` in the request proto and call
`transformPythonFunction()` here (actually we many not need this new function).
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -352,10 +355,15 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
catalogTable = catalogTable)
resultDf.createOrReplaceTempView(query.name)
query
- } else if (source == SOURCE_NAME_FOREACH) {
+ } else if (source == SOURCE_NAME_FOREACH && foreachWriter != null) {
Review Comment:
Refactor this into `if (source == SOURCE_NAME_FOREACH) {`
In side, it looks like we set up `sink` based on `if (foreachWriter != null)
...`
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
string path = 11;
string table_name = 12;
}
+
+ optional Foreach foreach = 13;
Review Comment:
remove `optional`. These are optional already.
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
private var foreachWriter: ForeachWriter[T] = null
+ private var pythonForeachWriter: PythonForeachWriter = null
+
Review Comment:
I will think about this.. it might be doable since it is ok in PySpark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]