spark git commit: [SPARK-16031] Add debug-only socket source in Structured Streaming
Repository: spark Updated Branches: refs/heads/master 5930d7a2e -> 4f17fddcd [SPARK-16031] Add debug-only socket source in Structured Streaming ## What changes were proposed in this pull request? This patch adds a text-based socket source similar to the one in Spark Streaming for debugging and tutorials. The source is clearly marked as debug-only so that users don't try to run it in production applications, because this type of source cannot provide HA without storing a lot of state in Spark. ## How was this patch tested? Unit tests and manual tests in spark-shell. Author: Matei Zaharia Closes #13748 from mateiz/socket-source. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f17fddc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f17fddc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f17fddc Branch: refs/heads/master Commit: 4f17fddcd57adeae0d7e31bd14423283d4b625e9 Parents: 5930d7a Author: Matei Zaharia Authored: Sun Jun 19 21:27:04 2016 -0700 Committer: Reynold Xin Committed: Sun Jun 19 21:27:04 2016 -0700 -- apache.spark.sql.sources.DataSourceRegister | 1 + .../execution/streaming/FileStreamSource.scala | 2 + .../spark/sql/execution/streaming/Source.scala | 3 + .../execution/streaming/StreamExecution.scala | 1 + .../spark/sql/execution/streaming/memory.scala | 2 + .../spark/sql/execution/streaming/socket.scala | 144 +++ .../streaming/TextSocketStreamSuite.scala | 136 ++ .../spark/sql/streaming/StreamSuite.scala | 2 + .../test/DataStreamReaderWriterSuite.scala | 2 + 9 files changed, 293 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister -- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 9f8bb5d..27d32b5 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -4,3 +4,4 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider +org.apache.spark.sql.execution.streaming.TextSocketSourceProvider http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index bef5616..9886ad0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -128,4 +128,6 @@ class FileStreamSource( override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) override def toString: String = s"FileStreamSource[$qualifiedBasePath]" + + override def stop() {} } http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 14450c2..9711478 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -39,4 +39,7 @@ trait Source { * same data for a particular `start` and `end` pair. */ def getBatch(start: Option[Offset], end: Offset): DataFrame + + /** Stop this source and free any resources it has allocated. */ + def stop(): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/4f17fddc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 4a
spark git commit: [SPARK-16031] Add debug-only socket source in Structured Streaming
Repository: spark Updated Branches: refs/heads/branch-2.0 80c6d4e3a -> d11f533de [SPARK-16031] Add debug-only socket source in Structured Streaming ## What changes were proposed in this pull request? This patch adds a text-based socket source similar to the one in Spark Streaming for debugging and tutorials. The source is clearly marked as debug-only so that users don't try to run it in production applications, because this type of source cannot provide HA without storing a lot of state in Spark. ## How was this patch tested? Unit tests and manual tests in spark-shell. Author: Matei Zaharia Closes #13748 from mateiz/socket-source. (cherry picked from commit 4f17fddcd57adeae0d7e31bd14423283d4b625e9) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d11f533d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d11f533d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d11f533d Branch: refs/heads/branch-2.0 Commit: d11f533ded502c6cc4a129e201362bca6e302028 Parents: 80c6d4e Author: Matei Zaharia Authored: Sun Jun 19 21:27:04 2016 -0700 Committer: Reynold Xin Committed: Sun Jun 19 21:27:11 2016 -0700 -- apache.spark.sql.sources.DataSourceRegister | 1 + .../execution/streaming/FileStreamSource.scala | 2 + .../spark/sql/execution/streaming/Source.scala | 3 + .../execution/streaming/StreamExecution.scala | 1 + .../spark/sql/execution/streaming/memory.scala | 2 + .../spark/sql/execution/streaming/socket.scala | 144 +++ .../streaming/TextSocketStreamSuite.scala | 136 ++ .../spark/sql/streaming/StreamSuite.scala | 2 + .../test/DataStreamReaderWriterSuite.scala | 2 + 9 files changed, 293 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister -- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 9f8bb5d..27d32b5 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -4,3 +4,4 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider +org.apache.spark.sql.execution.streaming.TextSocketSourceProvider http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index bef5616..9886ad0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -128,4 +128,6 @@ class FileStreamSource( override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) override def toString: String = s"FileStreamSource[$qualifiedBasePath]" + + override def stop() {} } http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 14450c2..9711478 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -39,4 +39,7 @@ trait Source { * same data for a particular `start` and `end` pair. */ def getBatch(start: Option[Offset], end: Offset): DataFrame + + /** Stop this source and free any resources it has allocated. */ + def stop(): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/d11f533d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution