WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1183993090


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
 
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
-      // DataStreamReader
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( 
SPARK-43144)
-      ),
-
       // DataStreamWriter
       ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // 
TODO(SPARK-43133)
-      ),
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),

Review Comment:
   Please see my change in core DataStreamWriter



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -352,10 +355,15 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
         catalogTable = catalogTable)
       resultDf.createOrReplaceTempView(query.name)
       query
-    } else if (source == SOURCE_NAME_FOREACH) {
+    } else if (source == SOURCE_NAME_FOREACH && foreachWriter != null) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
       val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
       startQuery(sink, extraOptions, catalogTable = catalogTable)
+    } else if (source == SOURCE_NAME_FOREACH) {
+      assertNotPartitioned(SOURCE_NAME_FOREACH)
+      val sink = new ForeachWriterTable[UnsafeRow](
+        pythonForeachWriter, Right((x: InternalRow) => 
x.asInstanceOf[UnsafeRow]))

Review Comment:
   A copy from 
   
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala#L96-L108
   
   See my comment below



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is 
because `foreachWriter` cannot be used here because it's type parameter is the 
same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` 
extends ForeachWriter[UnsafeRow]:
   
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call 
`writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   The original caller is in python:
   
https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone 
knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to