Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20243#discussion_r161305572
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
---
@@ -17,58 +17,36 @@
package org.apache.spark.sql.execution.streaming
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.sources.{BaseRelation,
CreatableRelationProvider, DataSourceRegister, StreamSinkProvider}
-import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.StructType
-
-class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
- // Number of rows to display, by default 20 rows
- private val numRowsToShow =
options.get("numRows").map(_.toInt).getOrElse(20)
-
- // Truncate the displayed data if it is too long, by default it is true
- private val isTruncated =
options.get("truncate").map(_.toBoolean).getOrElse(true)
+import java.util.Optional
- // Track the batch id
- private var lastBatchId = -1L
-
- override def addBatch(batchId: Long, data: DataFrame): Unit =
synchronized {
- val batchIdStr = if (batchId <= lastBatchId) {
- s"Rerun batch: $batchId"
- } else {
- lastBatchId = batchId
- s"Batch: $batchId"
- }
-
- // scalastyle:off println
- println("-------------------------------------------")
- println(batchIdStr)
- println("-------------------------------------------")
- // scalastyle:off println
- data.sparkSession.createDataFrame(
- data.sparkSession.sparkContext.parallelize(data.collect()),
data.schema)
- .show(numRowsToShow, isTruncated)
- }
+import scala.collection.JavaConverters._
- override def toString(): String = s"ConsoleSink[numRows=$numRowsToShow,
truncate=$isTruncated]"
-}
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
+import org.apache.spark.sql.sources.{BaseRelation,
CreatableRelationProvider, DataSourceRegister}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
case class ConsoleRelation(override val sqlContext: SQLContext, data:
DataFrame)
extends BaseRelation {
override def schema: StructType = data.schema
}
-class ConsoleSinkProvider extends StreamSinkProvider
+class ConsoleSinkProvider extends DataSourceV2
+ with MicroBatchWriteSupport
with DataSourceRegister
with CreatableRelationProvider {
- def createSink(
- sqlContext: SQLContext,
- parameters: Map[String, String],
- partitionColumns: Seq[String],
- outputMode: OutputMode): Sink = {
- new ConsoleSink(parameters)
+
+ override def createMicroBatchWriter(
+ queryId: String,
+ epochId: Long,
+ schema: StructType,
+ mode: OutputMode,
+ options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
+ Optional.of(new ConsoleWriter(epochId, schema,
options.asMap.asScala.toMap))
}
def createRelation(
--- End diff --
What is createRelation used for? For batch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]