rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1193333270
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,21 @@ message WriteStreamOperationStart {
string path = 11;
string table_name = 12;
}
+
+ StreamingForeachWriter foreach = 13;
Review Comment:
How about renaming this `foreach_writer`. That matches the `Writer` suffix
used in message name. Leaving it as just `foreach` looks quite generic.
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
string path = 11;
string table_name = 12;
}
+
+ optional Foreach foreach = 13;
+}
+
+message Foreach {
+ // (Required) The encoded commands of the Python foreach function
+ bytes command = 1;
Review Comment:
What breaks if we do reuse it? Given there will more updates to PythonUDF
(libraries etc), reusing would reduce work in the future.
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
private var foreachWriter: ForeachWriter[T] = null
+ private var pythonForeachWriter: PythonForeachWriter = null
+
Review Comment:
>
`asInstanceOf[org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]] ?`
Yes, what happens we do this?
##########
python/pyspark/sql/streaming/readwriter.py:
##########
@@ -1122,6 +1122,85 @@ def trigger(
self._jwrite = self._jwrite.trigger(jTrigger)
return self
+ @staticmethod
+ def construct_foreach_function(f: Union[Callable[[Row], None],
"SupportsProcess"]):
Review Comment:
Nice, thanks for reusing it.
The name should start with `_` since it is meant for internal use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]