spark git commit: [SPARK-23144][SS] Added console sink for continuous processing

2018-01-18 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 e6e8bbe84 -> 1f88fcd41


[SPARK-23144][SS] Added console sink for continuous processing

## What changes were proposed in this pull request?
Refactored ConsoleWriter into ConsoleMicrobatchWriter and 
ConsoleContinuousWriter.

## How was this patch tested?
new unit test

Author: Tathagata Das 

Closes #20311 from tdas/SPARK-23144.

(cherry picked from commit bf34d665b9c865e00fac7001500bf6d521c2dff9)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f88fcd4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f88fcd4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f88fcd4

Branch: refs/heads/branch-2.3
Commit: 1f88fcd41c6c5521d732b25e83d6c9d150d7f24a
Parents: e6e8bbe
Author: Tathagata Das 
Authored: Thu Jan 18 12:33:39 2018 -0800
Committer: Tathagata Das 
Committed: Thu Jan 18 12:33:54 2018 -0800

--
 .../spark/sql/execution/streaming/console.scala | 20 +++--
 .../streaming/sources/ConsoleWriter.scala   | 80 +++-
 .../streaming/sources/ConsoleWriterSuite.scala  | 26 ++-
 3 files changed, 96 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1f88fcd4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 9482037..f2aa325 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -19,13 +19,12 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.Optional
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
+import 
org.apache.spark.sql.execution.streaming.sources.{ConsoleContinuousWriter, 
ConsoleMicroBatchWriter, ConsoleWriter}
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister}
 import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, 
MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -37,16 +36,25 @@ case class ConsoleRelation(override val sqlContext: 
SQLContext, data: DataFrame)
 
 class ConsoleSinkProvider extends DataSourceV2
   with MicroBatchWriteSupport
+  with ContinuousWriteSupport
   with DataSourceRegister
   with CreatableRelationProvider {
 
   override def createMicroBatchWriter(
   queryId: String,
-  epochId: Long,
+  batchId: Long,
   schema: StructType,
   mode: OutputMode,
   options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
-Optional.of(new ConsoleWriter(epochId, schema, options))
+Optional.of(new ConsoleMicroBatchWriter(batchId, schema, options))
+  }
+
+  override def createContinuousWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): Optional[ContinuousWriter] = {
+Optional.of(new ConsoleContinuousWriter(schema, options))
   }
 
   def createRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/1f88fcd4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
index 3619799..6fb61df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
@@ -20,45 +20,85 @@ package org.apache.spark.sql.execution.streaming.sources
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, 
DataWriterFactory, WriterCommitMessage}
 import org.apache.spark.sql.types.StructType
 
-/**
- * A [[DataSourceV2Writer]] t

spark git commit: [SPARK-23144][SS] Added console sink for continuous processing

2018-01-18 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 2d41f040a -> bf34d665b


[SPARK-23144][SS] Added console sink for continuous processing

## What changes were proposed in this pull request?
Refactored ConsoleWriter into ConsoleMicrobatchWriter and 
ConsoleContinuousWriter.

## How was this patch tested?
new unit test

Author: Tathagata Das 

Closes #20311 from tdas/SPARK-23144.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf34d665
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf34d665
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf34d665

Branch: refs/heads/master
Commit: bf34d665b9c865e00fac7001500bf6d521c2dff9
Parents: 2d41f04
Author: Tathagata Das 
Authored: Thu Jan 18 12:33:39 2018 -0800
Committer: Tathagata Das 
Committed: Thu Jan 18 12:33:39 2018 -0800

--
 .../spark/sql/execution/streaming/console.scala | 20 +++--
 .../streaming/sources/ConsoleWriter.scala   | 80 +++-
 .../streaming/sources/ConsoleWriterSuite.scala  | 26 ++-
 3 files changed, 96 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bf34d665/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 9482037..f2aa325 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -19,13 +19,12 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.Optional
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
+import 
org.apache.spark.sql.execution.streaming.sources.{ConsoleContinuousWriter, 
ConsoleMicroBatchWriter, ConsoleWriter}
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister}
 import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, 
MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -37,16 +36,25 @@ case class ConsoleRelation(override val sqlContext: 
SQLContext, data: DataFrame)
 
 class ConsoleSinkProvider extends DataSourceV2
   with MicroBatchWriteSupport
+  with ContinuousWriteSupport
   with DataSourceRegister
   with CreatableRelationProvider {
 
   override def createMicroBatchWriter(
   queryId: String,
-  epochId: Long,
+  batchId: Long,
   schema: StructType,
   mode: OutputMode,
   options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
-Optional.of(new ConsoleWriter(epochId, schema, options))
+Optional.of(new ConsoleMicroBatchWriter(batchId, schema, options))
+  }
+
+  override def createContinuousWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): Optional[ContinuousWriter] = {
+Optional.of(new ConsoleContinuousWriter(schema, options))
   }
 
   def createRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/bf34d665/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
index 3619799..6fb61df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
@@ -20,45 +20,85 @@ package org.apache.spark.sql.execution.streaming.sources
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, 
DataWriterFactory, WriterCommitMessage}
 import org.apache.spark.sql.types.StructType
 
-/**
- * A [[DataSourceV2Writer]] that collects results to the driver and prints 
them in the console.
- * Generated by 
[[org.apache.spark.sql