rangadi commented on code in PR #41026: URL: https://github.com/apache/spark/pull/41026#discussion_r1194455270
########## sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala: ########## @@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * @since 2.0.0 */ def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = { + this.source = SOURCE_NAME_FOREACH + this.foreachWriter = if (writer != null) { + ds.sparkSession.sparkContext.clean(writer.asInstanceOf[ForeachWriter[Any]]) + } else { + throw new IllegalArgumentException("foreach writer cannot be null") + } + this + } + + private[sql] def foreachConnect(writer: ForeachWriter[Any]): DataStreamWriter[T] = { Review Comment: Rename it to `foreachImplementation()`? ########## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ########## @@ -40,7 +41,11 @@ import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.Streami import org.apache.spark.connect.proto.WriteStreamOperationStart import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase import org.apache.spark.ml.{functions => MLFunctions} +<<<<<<< HEAD Review Comment: Fix. ########## sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala: ########## @@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * @since 2.0.0 */ def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = { + this.source = SOURCE_NAME_FOREACH Review Comment: call `foreachConnect()` here, rather than reimplementing it. ########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala: ########## @@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility { // TypedColumn ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"), - - // DataStreamReader - ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144) - ), - // DataStreamWriter ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133) - ), + "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"), Review Comment: Actually we don't need this. The method is private to `[sql]`. ########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala: ########## @@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility { // TypedColumn ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"), - - // DataStreamReader - ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144) - ), - // DataStreamWriter ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133) - ), + "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"), Review Comment: This name should be in sync with the method name in DataStreamWriter. I like `foreachPython`. Also add a comment here that this is not public API. ########## connector/connect/common/src/main/protobuf/spark/connect/commands.proto: ########## @@ -209,6 +209,15 @@ message WriteStreamOperationStart { string path = 11; string table_name = 12; } + + optional Foreach foreach = 13; +} + +message Foreach { + // (Required) The encoded commands of the Python foreach function + bytes command = 1; Review Comment: Sure, we can go with your current approache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org