Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21116#discussion_r183488923
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask,
InternalRowDataWriterFactory}
+import
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError,
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing
[[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query:
SparkPlan)
+ extends SparkPlan with Logging {
+ override def children: Seq[SparkPlan] = Seq(query)
+ override def output: Seq[Attribute] = Nil
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val writerFactory = writer match {
+ case w: SupportsWriteInternalRow =>
w.createInternalRowWriterFactory()
+ case _ => new
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+ }
+
+ val rdd = query.execute()
+ val messages = new Array[WriterCommitMessage](rdd.partitions.length)
--- End diff --
Is this really needed. The only use of it is in the logInfo before, that
too, only in the length, which is effectively `rdd.partitions.length`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]