spark git commit: [SPARK-16031] Add debug-only socket source in Structured Streaming

2016-06-19 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 5930d7a2e -> 4f17fddcd


[SPARK-16031] Add debug-only socket source in Structured Streaming

## What changes were proposed in this pull request?

This patch adds a text-based socket source similar to the one in Spark 
Streaming for debugging and tutorials. The source is clearly marked as 
debug-only so that users don't try to run it in production applications, 
because this type of source cannot provide HA without storing a lot of state in 
Spark.

## How was this patch tested?

Unit tests and manual tests in spark-shell.

Author: Matei Zaharia 

Closes #13748 from mateiz/socket-source.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f17fddc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f17fddc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f17fddc

Branch: refs/heads/master
Commit: 4f17fddcd57adeae0d7e31bd14423283d4b625e9
Parents: 5930d7a
Author: Matei Zaharia 
Authored: Sun Jun 19 21:27:04 2016 -0700
Committer: Reynold Xin 
Committed: Sun Jun 19 21:27:04 2016 -0700

--
 apache.spark.sql.sources.DataSourceRegister |   1 +
 .../execution/streaming/FileStreamSource.scala  |   2 +
 .../spark/sql/execution/streaming/Source.scala  |   3 +
 .../execution/streaming/StreamExecution.scala   |   1 +
 .../spark/sql/execution/streaming/memory.scala  |   2 +
 .../spark/sql/execution/streaming/socket.scala  | 144 +++
 .../streaming/TextSocketStreamSuite.scala   | 136 ++
 .../spark/sql/streaming/StreamSuite.scala   |   2 +
 .../test/DataStreamReaderWriterSuite.scala  |   2 +
 9 files changed, 293 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
--
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 9f8bb5d..27d32b5 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -4,3 +4,4 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
+org.apache.spark.sql.execution.streaming.TextSocketSourceProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index bef5616..9886ad0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -128,4 +128,6 @@ class FileStreamSource(
   override def getOffset: Option[Offset] = 
Some(fetchMaxOffset()).filterNot(_.offset == -1)
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
+
+  override def stop() {}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 14450c2..9711478 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -39,4 +39,7 @@ trait Source  {
* same data for a particular `start` and `end` pair.
*/
   def getBatch(start: Option[Offset], end: Offset): DataFrame
+
+  /** Stop this source and free any resources it has allocated. */
+  def stop(): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4a

spark git commit: [SPARK-16031] Add debug-only socket source in Structured Streaming

2016-06-19 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 80c6d4e3a -> d11f533de


[SPARK-16031] Add debug-only socket source in Structured Streaming

## What changes were proposed in this pull request?

This patch adds a text-based socket source similar to the one in Spark 
Streaming for debugging and tutorials. The source is clearly marked as 
debug-only so that users don't try to run it in production applications, 
because this type of source cannot provide HA without storing a lot of state in 
Spark.

## How was this patch tested?

Unit tests and manual tests in spark-shell.

Author: Matei Zaharia 

Closes #13748 from mateiz/socket-source.

(cherry picked from commit 4f17fddcd57adeae0d7e31bd14423283d4b625e9)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d11f533d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d11f533d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d11f533d

Branch: refs/heads/branch-2.0
Commit: d11f533ded502c6cc4a129e201362bca6e302028
Parents: 80c6d4e
Author: Matei Zaharia 
Authored: Sun Jun 19 21:27:04 2016 -0700
Committer: Reynold Xin 
Committed: Sun Jun 19 21:27:11 2016 -0700

--
 apache.spark.sql.sources.DataSourceRegister |   1 +
 .../execution/streaming/FileStreamSource.scala  |   2 +
 .../spark/sql/execution/streaming/Source.scala  |   3 +
 .../execution/streaming/StreamExecution.scala   |   1 +
 .../spark/sql/execution/streaming/memory.scala  |   2 +
 .../spark/sql/execution/streaming/socket.scala  | 144 +++
 .../streaming/TextSocketStreamSuite.scala   | 136 ++
 .../spark/sql/streaming/StreamSuite.scala   |   2 +
 .../test/DataStreamReaderWriterSuite.scala  |   2 +
 9 files changed, 293 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
--
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 9f8bb5d..27d32b5 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -4,3 +4,4 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
+org.apache.spark.sql.execution.streaming.TextSocketSourceProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index bef5616..9886ad0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -128,4 +128,6 @@ class FileStreamSource(
   override def getOffset: Option[Offset] = 
Some(fetchMaxOffset()).filterNot(_.offset == -1)
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
+
+  override def stop() {}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 14450c2..9711478 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -39,4 +39,7 @@ trait Source  {
* same data for a particular `start` and `end` pair.
*/
   def getBatch(start: Option[Offset], end: Offset): DataFrame
+
+  /** Stop this source and free any resources it has allocated. */
+  def stop(): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution