[spark] branch master updated: [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1df69f7 [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2 1df69f7 is described below commit 1df69f7e324aa799c05f6158e433371c5eeed8ce Author: Ryan Blue AuthorDate: Wed Nov 18 14:07:51 2020 -0800 [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2 ### What changes were proposed in this pull request? This adds support for metadata columns to DataSourceV2. If a source implements `SupportsMetadataColumns` it must also implement `SupportsPushDownRequiredColumns` to support projecting those columns. The analyzer is updated to resolve metadata columns from `LogicalPlan.metadataOutput`, and this adds a rule that will add metadata columns to the output of `DataSourceV2Relation` if one is used. ### Why are the changes needed? This is the solution discussed for exposing additional data in the Kafka source. It is also needed for a generic `MERGE INTO` plan. ### Does this PR introduce any user-facing change? Yes. Users can project additional columns from sources that implement the new API. This also updates `DescribeTableExec` to show metadata columns. ### How was this patch tested? Will include new unit tests. Closes #28027 from rdblue/add-dsv2-metadata-columns. Authored-by: Ryan Blue Signed-off-by: Burak Yavuz --- .../sql/connector/catalog/MetadataColumn.java | 58 + .../connector/catalog/SupportsMetadataColumns.java | 37 +++ .../spark/sql/catalyst/analysis/Analyzer.scala | 24 +++ .../sql/catalyst/plans/logical/LogicalPlan.scala | 6 +- .../plans/logical/basicLogicalOperators.scala | 6 ++ .../datasources/v2/DataSourceV2Implicits.scala | 16 - .../datasources/v2/DataSourceV2Relation.scala | 26 +++- .../apache/spark/sql/connector/InMemoryTable.scala | 74 ++ .../datasources/v2/DescribeTableExec.scala | 16 - .../execution/datasources/v2/PushDownUtils.scala | 12 ++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 43 + 11 files changed, 296 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java new file mode 100644 index 000..8aefa28 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java @@ -0,0 +1,58 @@ +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataType; + +/** + * Interface for a metadata column. + * + * A metadata column can expose additional metadata about a row. For example, rows from Kafka can + * use metadata columns to expose a message's topic, partition number, and offset. + * + * A metadata column could also be the result of a transform applied to a value in the row. For + * example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In + * this case, {@link #transform()} should return a non-null {@link Transform} that produced the + * metadata column's values. + */ +@Evolving +public interface MetadataColumn { + /** + * The name of this metadata column. + * + * @return a String name + */ + String name(); + + /** + * The data type of values in this metadata column. + * + * @return a {@link DataType} + */ + DataType dataType(); + + /** + * @return whether values produced by this metadata column may be null + */ + default boolean isNullable() { +return true; + } + + /** + * Documentation for this metadata column, or null. + * + * @return a documentation String + */ + default String comment() { +return null; + } + + /** + * The {@link Transform} used to produce this metadata column from data rows, or null. + * + * @return a {@link Transform} used to produce the column's values, or null if there isn't one + */ + default Transform transform() { +return null; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java new file mode 100644 index 000..fc31349 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java @@ -0,0 +1,37 @@ +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import
[spark] branch branch-3.0 updated: [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2221d3e [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run 2221d3e is described below commit 2221d3e0183140a0a98f6de92f84d2d924aab703 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Wed Apr 8 16:59:39 2020 -0700 [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run ### What changes were proposed in this pull request? This patch fixes the behavior of ProgressReporter which always overwrite the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.) It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch. ### Why are the changes needed? Currently Spark doesn't reflect correct metrics when empty batch is run and this patch fixes it. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified UT. Note that FlatMapGroupsWithState increases the value of "updated" when state rows are removed. Also manually tested via below query (not a simple query to test with spark-shell, as you'll meet closure issue in spark-shell while playing with state func): > query ``` case class RunningCount(count: Long) object TestFlatMapGroupsWithState { def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession val ss = SparkSession .builder() .appName("TestFlatMapGroupsWithState") .getOrCreate() ss.conf.set("spark.sql.shuffle.partitions", "5") import ss.implicits._ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (state.hasTimedOut) { // End users are not restricted to remove the state here - they can update the // state as well. For example, event time session window would have list of // sessions here and it cannot remove entire state. state.update(RunningCount(-1)) Iterator((key, "-1")) } else { val count = state.getOption.map(_.count).getOrElse(0L) + values.size state.update(RunningCount(count)) state.setTimeoutDuration("1 seconds") Iterator((key, count.toString)) } } implicit val sqlContext = ss.sqlContext val inputData = MemoryStream[String] val result = inputData .toDF() .as[String] .groupByKey { v => v } .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(stateFunc) val query = result .writeStream .format("memory") .option("queryName", "test") .outputMode("append") .trigger(Trigger.ProcessingTime("5 second")) .start() Thread.sleep(1000) var chIdx: Long = 0 while (true) { (chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) } chIdx += 5 // intentionally sleep much more than trigger to enable "empty" batch Thread.sleep(10 * 1000) } } } ``` > before the patch (batch 3 which was an "empty" batch) ``` { "id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e", "runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b", "name":"test", "timestamp":"2019-11-18T07:00:25.005Z", "batchId":3, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0, "durationMs":{ "addBatch":1664, "getBatch":0, "latestOffset":0, "queryPlanning":29, "triggerExecution":1789, "walCommit":51 }, "stateOperators":[ { "numRowsTotal":10, "numRowsUpdated":0,
[spark] branch master updated: [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ca2ba4f [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run ca2ba4f is described below commit ca2ba4fe647cd60668410b68014a3991ad7fd5c9 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Wed Apr 8 16:59:39 2020 -0700 [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run ### What changes were proposed in this pull request? This patch fixes the behavior of ProgressReporter which always overwrite the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.) It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch. ### Why are the changes needed? Currently Spark doesn't reflect correct metrics when empty batch is run and this patch fixes it. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified UT. Note that FlatMapGroupsWithState increases the value of "updated" when state rows are removed. Also manually tested via below query (not a simple query to test with spark-shell, as you'll meet closure issue in spark-shell while playing with state func): > query ``` case class RunningCount(count: Long) object TestFlatMapGroupsWithState { def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession val ss = SparkSession .builder() .appName("TestFlatMapGroupsWithState") .getOrCreate() ss.conf.set("spark.sql.shuffle.partitions", "5") import ss.implicits._ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (state.hasTimedOut) { // End users are not restricted to remove the state here - they can update the // state as well. For example, event time session window would have list of // sessions here and it cannot remove entire state. state.update(RunningCount(-1)) Iterator((key, "-1")) } else { val count = state.getOption.map(_.count).getOrElse(0L) + values.size state.update(RunningCount(count)) state.setTimeoutDuration("1 seconds") Iterator((key, count.toString)) } } implicit val sqlContext = ss.sqlContext val inputData = MemoryStream[String] val result = inputData .toDF() .as[String] .groupByKey { v => v } .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(stateFunc) val query = result .writeStream .format("memory") .option("queryName", "test") .outputMode("append") .trigger(Trigger.ProcessingTime("5 second")) .start() Thread.sleep(1000) var chIdx: Long = 0 while (true) { (chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) } chIdx += 5 // intentionally sleep much more than trigger to enable "empty" batch Thread.sleep(10 * 1000) } } } ``` > before the patch (batch 3 which was an "empty" batch) ``` { "id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e", "runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b", "name":"test", "timestamp":"2019-11-18T07:00:25.005Z", "batchId":3, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0, "durationMs":{ "addBatch":1664, "getBatch":0, "latestOffset":0, "queryPlanning":29, "triggerExecution":1789, "walCommit":51 }, "stateOperators":[ { "numRowsTotal":10, "numRowsUpdated":0,
[spark] branch branch-3.0 updated: [SPARK-31278][SS] Fix StreamingQuery output rows metric
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a856eea [SPARK-31278][SS] Fix StreamingQuery output rows metric a856eea is described below commit a856eea42949810f54c5f2f41b9c9abdd2da37c6 Author: Burak Yavuz AuthorDate: Tue Apr 7 17:17:47 2020 -0700 [SPARK-31278][SS] Fix StreamingQuery output rows metric ### What changes were proposed in this pull request? In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug. ### Why are the changes needed? Fixes a bug around incorrect metrics ### Does this PR introduce any user-facing change? Fixes a bug in the metrics ### How was this patch tested? New regression test Closes #28040 from brkyvz/sinkMetrics. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz (cherry picked from commit 8ab2a0c5f23a59c00a9b4191afd976af50d913ba) Signed-off-by: Burak Yavuz --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 2 +- .../execution/streaming/MicroBatchExecution.scala | 3 +- .../sql/execution/streaming/ProgressReporter.scala | 32 ++ .../sql/streaming/StreamingAggregationSuite.scala | 71 ++ .../streaming/StreamingDeduplicationSuite.scala| 3 +- .../StreamingQueryStatusAndProgressSuite.scala | 1 + 6 files changed, 73 insertions(+), 39 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 5c8c5b1..4e808a5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase { try { input.addData("1", "2", "3") verifyResult(writer) { -assert(writer.lastProgress.sink.numOutputRows == 3L) +assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L)) } } finally { writer.stop() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 45a2ce1..e022bfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -226,7 +226,8 @@ class MicroBatchExecution( } } -finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded +// Must be outside reportTimeTaken so it is recorded +finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed) // Signal waiting threads. Note this must be after finishTrigger() to ensure all // activities (progress generation, etc.) have completed before signaling. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index feb151a..d1086cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -85,8 +85,8 @@ trait ProgressReporter extends Logging { private val noDataProgressEventInterval = sparkSession.sessionState.conf.streamingNoDataProgressEventInterval - // The timestamp we report an event that has no input data - private var lastNoDataProgressEventTime = Long.MinValue + // The timestamp we report an event that has not executed anything + private var lastNoExecutionProgressEventTime = Long.MinValue private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) @@ -142,8 +142,15 @@ trait ProgressReporter extends Logging { logInfo(s"Streaming query made progress: $newProgress") } - /** Finalizes the query progress and adds it to list of recent status updates. */ - protected def finishTrigger(hasNewData: Boolean): Unit = { + /** + * Finalizes the query progress and adds it to list of recent status updates. + * + * @param hasNewData Whether the sources of this str
[spark] branch master updated: [SPARK-31278][SS] Fix StreamingQuery output rows metric
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8ab2a0c [SPARK-31278][SS] Fix StreamingQuery output rows metric 8ab2a0c is described below commit 8ab2a0c5f23a59c00a9b4191afd976af50d913ba Author: Burak Yavuz AuthorDate: Tue Apr 7 17:17:47 2020 -0700 [SPARK-31278][SS] Fix StreamingQuery output rows metric ### What changes were proposed in this pull request? In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug. ### Why are the changes needed? Fixes a bug around incorrect metrics ### Does this PR introduce any user-facing change? Fixes a bug in the metrics ### How was this patch tested? New regression test Closes #28040 from brkyvz/sinkMetrics. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 2 +- .../execution/streaming/MicroBatchExecution.scala | 3 +- .../sql/execution/streaming/ProgressReporter.scala | 32 ++ .../sql/streaming/StreamingAggregationSuite.scala | 71 ++ .../streaming/StreamingDeduplicationSuite.scala| 3 +- .../StreamingQueryStatusAndProgressSuite.scala | 1 + 6 files changed, 73 insertions(+), 39 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 5c8c5b1..4e808a5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase { try { input.addData("1", "2", "3") verifyResult(writer) { -assert(writer.lastProgress.sink.numOutputRows == 3L) +assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L)) } } finally { writer.stop() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 45a2ce1..e022bfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -226,7 +226,8 @@ class MicroBatchExecution( } } -finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded +// Must be outside reportTimeTaken so it is recorded +finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed) // Signal waiting threads. Note this must be after finishTrigger() to ensure all // activities (progress generation, etc.) have completed before signaling. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index feb151a..d1086cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -85,8 +85,8 @@ trait ProgressReporter extends Logging { private val noDataProgressEventInterval = sparkSession.sessionState.conf.streamingNoDataProgressEventInterval - // The timestamp we report an event that has no input data - private var lastNoDataProgressEventTime = Long.MinValue + // The timestamp we report an event that has not executed anything + private var lastNoExecutionProgressEventTime = Long.MinValue private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) @@ -142,8 +142,15 @@ trait ProgressReporter extends Logging { logInfo(s"Streaming query made progress: $newProgress") } - /** Finalizes the query progress and adds it to list of recent status updates. */ - protected def finishTrigger(hasNewData: Boolean): Unit = { + /** + * Finalizes the query progress and adds it to list of recent status updates. + * + * @param hasNewData Whether the sources of this stream had new data for this trigger. + * @param hasExecuted Whether any batch was executed during this trigg
[spark] branch branch-3.0 updated: [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a97117f [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times a97117f is described below commit a97117f1294d3625d71809f2770523ad0e14ade0 Author: Burak Yavuz AuthorDate: Wed Mar 18 18:07:24 2020 -0700 [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times ### What changes were proposed in this pull request? This PR prevents the execution of V2 DataSource exec nodes multiple times when `collect()` is called on them. For V1 DataSources, commands would be executed as a RunnableCommand, which would cache the result as part of the `ExecutedCommandExec` node. We extend `V2CommandExec` for all the data writing commands so that they only get executed once as well. ### Why are the changes needed? Calling `collect()` on a SQL command that inserts data or creates a table gets executed multiple times otherwise. ### Does this PR introduce any user-facing change? Fixes a bug ### How was this patch tested? Unit tests Closes #27941 from brkyvz/doubleInsert. Authored-by: Burak Yavuz Signed-off-by: Burak Yavuz (cherry picked from commit 4237251861c79f3176de7cf5232f0388ec5d946e) Signed-off-by: Burak Yavuz --- .../datasources/v2/ShowNamespacesExec.scala| 4 +- .../execution/datasources/v2/ShowTablesExec.scala | 4 +- .../datasources/v2/V1FallbackWriters.scala | 10 ++-- .../execution/datasources/v2/V2CommandExec.scala | 6 +- .../datasources/v2/WriteToDataSourceV2Exec.scala | 28 - .../spark/sql/connector/DataSourceV2SQLSuite.scala | 68 ++ .../spark/sql/connector/InsertIntoTests.scala | 17 ++ 7 files changed, 112 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala index fe3ab80..6f96848 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchem import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.connector.catalog.SupportsNamespaces +import org.apache.spark.sql.execution.LeafExecNode /** * Physical plan node for showing namespaces. @@ -33,8 +34,7 @@ case class ShowNamespacesExec( output: Seq[Attribute], catalog: SupportsNamespaces, namespace: Seq[String], -pattern: Option[String]) -extends V2CommandExec { +pattern: Option[String]) extends V2CommandExec with LeafExecNode { override protected def run(): Seq[InternalRow] = { val namespaces = if (namespace.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala index 995b008..c740e0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchem import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.execution.LeafExecNode /** * Physical plan node for showing tables. @@ -33,8 +34,7 @@ case class ShowTablesExec( output: Seq[Attribute], catalog: TableCatalog, namespace: Seq[String], -pattern: Option[String]) -extends V2CommandExec { +pattern: Option[String]) extends V2CommandExec with LeafExecNode { override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() val encoder = RowEncoder(schema).resolveAndBind() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index f973000..7502a87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -41,7 +41,7 @@ case class AppendDataExecV1( writeOptions
[spark] branch master updated: [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4237251 [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times 4237251 is described below commit 4237251861c79f3176de7cf5232f0388ec5d946e Author: Burak Yavuz AuthorDate: Wed Mar 18 18:07:24 2020 -0700 [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times ### What changes were proposed in this pull request? This PR prevents the execution of V2 DataSource exec nodes multiple times when `collect()` is called on them. For V1 DataSources, commands would be executed as a RunnableCommand, which would cache the result as part of the `ExecutedCommandExec` node. We extend `V2CommandExec` for all the data writing commands so that they only get executed once as well. ### Why are the changes needed? Calling `collect()` on a SQL command that inserts data or creates a table gets executed multiple times otherwise. ### Does this PR introduce any user-facing change? Fixes a bug ### How was this patch tested? Unit tests Closes #27941 from brkyvz/doubleInsert. Authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../datasources/v2/ShowNamespacesExec.scala| 4 +- .../execution/datasources/v2/ShowTablesExec.scala | 4 +- .../datasources/v2/V1FallbackWriters.scala | 10 ++-- .../execution/datasources/v2/V2CommandExec.scala | 6 +- .../datasources/v2/WriteToDataSourceV2Exec.scala | 28 - .../spark/sql/connector/DataSourceV2SQLSuite.scala | 68 ++ .../spark/sql/connector/InsertIntoTests.scala | 17 ++ 7 files changed, 112 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala index fe3ab80..6f96848 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchem import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.connector.catalog.SupportsNamespaces +import org.apache.spark.sql.execution.LeafExecNode /** * Physical plan node for showing namespaces. @@ -33,8 +34,7 @@ case class ShowNamespacesExec( output: Seq[Attribute], catalog: SupportsNamespaces, namespace: Seq[String], -pattern: Option[String]) -extends V2CommandExec { +pattern: Option[String]) extends V2CommandExec with LeafExecNode { override protected def run(): Seq[InternalRow] = { val namespaces = if (namespace.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala index 995b008..c740e0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchem import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.execution.LeafExecNode /** * Physical plan node for showing tables. @@ -33,8 +34,7 @@ case class ShowTablesExec( output: Seq[Attribute], catalog: TableCatalog, namespace: Seq[String], -pattern: Option[String]) -extends V2CommandExec { +pattern: Option[String]) extends V2CommandExec with LeafExecNode { override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() val encoder = RowEncoder(schema).resolveAndBind() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index f973000..7502a87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -41,7 +41,7 @@ case class AppendDataExecV1( writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan) extends V1FallbackWriters { - override protected def
[spark] branch master updated: [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1cd19ad [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming 1cd19ad is described below commit 1cd19ad92da960f18a6673bc3ce670ce633050e5 Author: Burak Yavuz AuthorDate: Thu Jan 30 22:02:48 2020 -0800 [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming ### What changes were proposed in this pull request? We propose to add a new interface `SupportsAdmissionControl` and `ReadLimit`. A ReadLimit defines how much data should be read in the next micro-batch. `SupportsAdmissionControl` specifies that a source can rate limit its ingest into the system. The source can tell the system what the user specified as a read limit, and the system can enforce this limit within each micro-batch or impose its own limit if the Trigger is Trigger.Once() for example. We then use this interface in FileStreamSource, KafkaSource, and KafkaMicroBatchStream. ### Why are the changes needed? Sources currently have no information around execution semantics such as whether the stream is being executed in Trigger.Once() mode. This interface will pass this information into the sources as part of planning. With a trigger like Trigger.Once(), the semantics are to process all the data available to the datasource in a single micro-batch. However, this semantic can be broken when data source options such as `maxOffsetsPerTrigger` (in the Kafka source) rate limit the amount of data [...] ### Does this PR introduce any user-facing change? DataSource developers can extend this interface for their streaming sources to add admission control into their system and correctly support Trigger.Once(). ### How was this patch tested? Existing tests, as this API is mostly internal Closes #27380 from brkyvz/rateLimit. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 25 ++ .../apache/spark/sql/kafka010/KafkaSource.scala| 29 +++ .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 22 + .../read/streaming/ReadAllAvailable.java} | 28 +++ .../sql/connector/read/streaming/ReadLimit.java} | 25 ++ .../sql/connector/read/streaming/ReadMaxFiles.java | 55 + .../sql/connector/read/streaming/ReadMaxRows.java | 55 + .../read/streaming/SupportsAdmissionControl.java | 56 ++ .../sql/execution/streaming/FileStreamSource.scala | 25 -- .../execution/streaming/MicroBatchExecution.scala | 49 +-- .../sql/execution/streaming/StreamExecution.scala | 6 +-- .../streaming/continuous/ContinuousExecution.scala | 4 +- .../sql/streaming/FileStreamSourceSuite.scala | 56 ++ 13 files changed, 376 insertions(+), 59 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 844c963..6599e7e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -27,8 +27,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} -import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} -import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.UninterruptibleThread @@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream( options: CaseInsensitiveStringMap, metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, -failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging { +failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging { private[kafka010] val pollTimeoutMs = options.getLong( KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, @@ -77,13 +76,23 @@ private[kafka010] class KafkaMicroBatchStream( KafkaSourceOffset(getOrCreateInitialPartitionOffsets
[spark] branch master updated: [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 940510c [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming 940510c is described below commit 940510cb1e43a4166d2fe7d7eb4ace8561d24c9b Author: Burak Yavuz AuthorDate: Thu Jan 30 22:01:53 2020 -0800 [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming ### What changes were proposed in this pull request? We propose to add a new interface `SupportsAdmissionControl` and `ReadLimit`. A ReadLimit defines how much data should be read in the next micro-batch. `SupportsAdmissionControl` specifies that a source can rate limit its ingest into the system. The source can tell the system what the user specified as a read limit, and the system can enforce this limit within each micro-batch or impose its own limit if the Trigger is Trigger.Once() for example. We then use this interface in FileStreamSource, KafkaSource, and KafkaMicroBatchStream. ### Why are the changes needed? Sources currently have no information around execution semantics such as whether the stream is being executed in Trigger.Once() mode. This interface will pass this information into the sources as part of planning. With a trigger like Trigger.Once(), the semantics are to process all the data available to the datasource in a single micro-batch. However, this semantic can be broken when data source options such as `maxOffsetsPerTrigger` (in the Kafka source) rate limit the amount of data [...] ### Does this PR introduce any user-facing change? DataSource developers can extend this interface for their streaming sources to add admission control into their system and correctly support Trigger.Once(). ### How was this patch tested? Existing tests, as this API is mostly internal Closes #27380 from brkyvz/rateLimit. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 25 ++ .../apache/spark/sql/kafka010/KafkaSource.scala| 29 +++ .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 22 + .../read/streaming/ReadAllAvailable.java} | 28 +++ .../sql/connector/read/streaming/ReadLimit.java} | 25 ++ .../sql/connector/read/streaming/ReadMaxFiles.java | 55 + .../sql/connector/read/streaming/ReadMaxRows.java | 55 + .../read/streaming/SupportsAdmissionControl.java | 56 ++ .../sql/execution/streaming/FileStreamSource.scala | 25 -- .../execution/streaming/MicroBatchExecution.scala | 49 +-- .../sql/execution/streaming/StreamExecution.scala | 6 +-- .../streaming/continuous/ContinuousExecution.scala | 4 +- .../sql/streaming/FileStreamSourceSuite.scala | 56 ++ 13 files changed, 376 insertions(+), 59 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 844c963..6599e7e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -27,8 +27,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} -import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} -import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.UninterruptibleThread @@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream( options: CaseInsensitiveStringMap, metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, -failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging { +failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging { private[kafka010] val pollTimeoutMs = options.getLong( KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, @@ -77,13 +76,23 @@ private[kafka010] class KafkaMicroBatchStream( KafkaSourceOffset(getOrCreateInitialPartitionOffsets
[spark] branch master updated: [SPARK-30314] Add identifier and catalog information to DataSourceV2Relation
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d0800fc [SPARK-30314] Add identifier and catalog information to DataSourceV2Relation d0800fc is described below commit d0800fc8e2e71a79bf0f72c3e4bc608ae34053e7 Author: Yuchen Huo AuthorDate: Sun Jan 26 12:59:24 2020 -0800 [SPARK-30314] Add identifier and catalog information to DataSourceV2Relation ### What changes were proposed in this pull request? Add identifier and catalog information in DataSourceV2Relation so it would be possible to do richer checks in checkAnalysis step. ### Why are the changes needed? In data source v2, table implementations are all customized so we may not be able to get the resolved identifier from tables them selves. Therefore we encode the table and catalog information in DSV2Relation so no external changes are needed to make sure this information is available. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests in the following suites: CatalogManagerSuite.scala CatalogV2UtilSuite.scala SupportsCatalogOptionsSuite.scala PlanResolutionSuite.scala Closes #26957 from yuchenhuo/SPARK-30314. Authored-by: Yuchen Huo Signed-off-by: Burak Yavuz --- .../spark/sql/kafka010/KafkaRelationSuite.scala| 2 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 9 +-- .../sql/connector/catalog/CatalogV2Util.scala | 2 +- .../datasources/v2/DataSourceV2Relation.scala | 21 -- .../sql/connector/catalog/CatalogV2UtilSuite.scala | 40 .../org/apache/spark/sql/DataFrameReader.scala | 16 +++-- .../org/apache/spark/sql/DataFrameWriter.scala | 22 +++ .../org/apache/spark/sql/DataFrameWriterV2.scala | 10 ++- .../apache/spark/sql/execution/CacheManager.scala | 2 +- .../datasources/FallBackFileSourceV2.scala | 3 +- .../datasources/v2/DataSourceV2Strategy.scala | 8 +-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 43 + .../connector/SupportsCatalogOptionsSuite.scala| 75 +- .../sql/connector/TableCapabilityCheckSuite.scala | 32 + .../execution/command/PlanResolutionSuite.scala| 55 +++- .../parquet/ParquetPartitionDiscoverySuite.scala | 2 +- .../spark/sql/streaming/FileStreamSinkSuite.scala | 2 +- 17 files changed, 290 insertions(+), 54 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 063e2e2..2c022c1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -624,7 +624,7 @@ class KafkaRelationSuiteV2 extends KafkaRelationSuiteBase { val topic = newTopic() val df = createDF(topic) assert(df.logicalPlan.collect { - case DataSourceV2Relation(_, _, _) => true + case _: DataSourceV2Relation => true }.nonEmpty) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 45547bf..15ebf69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -817,8 +817,8 @@ class Analyzer( case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) => CatalogV2Util.loadRelation(u.catalog, u.tableName) -.map(rel => alter.copy(table = rel)) -.getOrElse(alter) + .map(rel => alter.copy(table = rel)) + .getOrElse(alter) case u: UnresolvedV2Relation => CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u) @@ -831,7 +831,8 @@ class Analyzer( expandRelationName(identifier) match { case NonSessionCatalogAndIdentifier(catalog, ident) => CatalogV2Util.loadTable(catalog, ident) match { -case Some(table) => Some(DataSourceV2Relation.create(table)) +case Some(table) => + Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident))) case None => None } case _ => None @@ -921,7 +922,7 @@ class Analyzer( case v1Table: V1Table => v1SessionCatalog.getRelation(v1Table.v1Table) case table => - DataSourceV2Relation.create(table) + DataSourceV2Relation.create(table, Some(catalog), S
[spark] branch master updated: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f8d5957 [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider f8d5957 is described below commit f8d59572b014e5254b0c574b26e101c2e4157bdd Author: Burak Yavuz AuthorDate: Thu Jan 9 11:18:16 2020 -0800 [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider ### What changes were proposed in this pull request? This PR introduces `SupportsCatalogOptions` as an interface for `TableProvider`. Through `SupportsCatalogOptions`, V2 DataSources can implement the two methods `extractIdentifier` and `extractCatalog` to support the creation, and existence check of tables without requiring a formal TableCatalog implementation. We currently don't support all SaveModes for DataSourceV2 in DataFrameWriter.save. The idea here is that eventually File based tables can be written with `DataFrameWriter.save(path)` will create a PathIdentifier where the name is `path`, and the V2SessionCatalog will be able to perform FileSystem checks at `path` to support ErrorIfExists and Ignore SaveModes. ### Why are the changes needed? To support all Save modes for V2 data sources with DataFrameWriter. Since we can now support table creation, we will be able to provide partitioning information when first creating the table as well. ### Does this PR introduce any user-facing change? Introduces a new interface ### How was this patch tested? Will add tests once interface is vetted. Closes #26913 from brkyvz/catalogOptions. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 13 +- .../connector/catalog/SupportsCatalogOptions.java | 53 + .../sql/connector/catalog/CatalogV2Util.scala | 11 ++ .../org/apache/spark/sql/DataFrameReader.scala | 21 +- .../org/apache/spark/sql/DataFrameWriter.scala | 128 .../connector/SupportsCatalogOptionsSuite.scala| 219 + .../sql/connector/TestV2SessionCatalogBase.scala | 5 + 7 files changed, 406 insertions(+), 44 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index e2dcd62..5c8c5b1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicInteger import scala.reflect.ClassTag +import scala.util.Try import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.internals.DefaultPartitioner @@ -500,7 +501,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { TestUtils.assertExceptionMsg(ex, "null topic present in the data") } - protected def testUnsupportedSaveModes(msg: (SaveMode) => String): Unit = { + protected def testUnsupportedSaveModes(msg: (SaveMode) => Seq[String]): Unit = { val topic = newTopic() testUtils.createTopic(topic) val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value") @@ -513,7 +514,10 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { .mode(mode) .save() } - TestUtils.assertExceptionMsg(ex, msg(mode)) + val errorChecks = msg(mode).map(m => Try(TestUtils.assertExceptionMsg(ex, m))) + if (!errorChecks.exists(_.isSuccess)) { +fail("Error messages not found in exception trace") + } } } @@ -541,7 +545,7 @@ class KafkaSinkBatchSuiteV1 extends KafkaSinkBatchSuiteBase { .set(SQLConf.USE_V1_SOURCE_LIST, "kafka") test("batch - unsupported save modes") { -testUnsupportedSaveModes((mode) => s"Save mode ${mode.name} not allowed for Kafka") +testUnsupportedSaveModes((mode) => s"Save mode ${mode.name} not allowed for Kafka" :: Nil) } } @@ -552,7 +556,8 @@ class KafkaSinkBatchSuiteV2 extends KafkaSinkBatchSuiteBase { .set(SQLConf.USE_V1_SOURCE_LIST, "") test("batch - unsupported save modes") { -testUnsupportedSaveModes((mode) => s"cannot be written with ${mode.name} mode") +testUnsupportedSaveModes((mode) => + Seq(s"cannot be written with ${mode.name} mode", "does not support truncate")) } test("generic - write
[spark] branch master updated: [SPARK-30143][SS] Add a timeout on stopping a streaming query
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4c37a8a [SPARK-30143][SS] Add a timeout on stopping a streaming query 4c37a8a is described below commit 4c37a8a3f4a489b52f1919d2db84f6e32c6a05cd Author: Burak Yavuz AuthorDate: Fri Dec 13 15:16:00 2019 -0800 [SPARK-30143][SS] Add a timeout on stopping a streaming query ### What changes were proposed in this pull request? Add a timeout configuration for StreamingQuery.stop() ### Why are the changes needed? The stop() method on a Streaming Query awaits the termination of the stream execution thread. However, the stream execution thread may block forever depending on the streaming source implementation (like in Kafka, which runs UninterruptibleThreads). This causes control flow applications to hang indefinitely as well. We'd like to introduce a timeout to stop the execution thread, so that the control flow thread can decide to do an action if a timeout is hit. ### Does this PR introduce any user-facing change? By default, no. If the timeout configuration is set, then a TimeoutException will be thrown if a stream cannot be stopped within the given timeout. ### How was this patch tested? Unit tests Closes #26771 from brkyvz/stopTimeout. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../org/apache/spark/sql/internal/SQLConf.scala| 7 ++ .../execution/streaming/MicroBatchExecution.scala | 3 +- .../sql/execution/streaming/StreamExecution.scala | 26 +++- .../streaming/continuous/ContinuousExecution.scala | 3 +- .../spark/sql/streaming/DataStreamWriter.scala | 11 +- .../spark/sql/streaming/StreamingQuery.scala | 12 +- .../sql/streaming/StreamingQueryManager.scala | 3 +- .../streaming/JavaDataStreamReaderWriterSuite.java | 5 +- .../apache/spark/sql/streaming/StreamSuite.scala | 35 +- .../sql/streaming/util/BlockOnStopSource.scala | 132 + 10 files changed, 224 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c54008c..91347cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1298,6 +1298,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val STREAMING_STOP_TIMEOUT = +buildConf("spark.sql.streaming.stopTimeout") + .doc("How long to wait for the streaming execution thread to stop when calling the " + +"streaming query's stop() method in milliseconds. 0 or negative values wait indefinitely.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(0L) + val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL = buildConf("spark.sql.streaming.noDataProgressEventInterval") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 5fe1f92..872c367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -150,8 +150,7 @@ class MicroBatchExecution( state.set(TERMINATED) if (queryExecutionThread.isAlive) { sparkSession.sparkContext.cancelJobGroup(runId.toString) - queryExecutionThread.interrupt() - queryExecutionThread.join() + interruptAndAwaitExecutionThreadTermination() // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak sparkSession.sparkContext.cancelJobGroup(runId.toString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f470ad3..1cb3955 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.io.{InterruptedIOException, IOException, UncheckedIOException} import java.nio.channels.ClosedByInterruptException import java.util.UUID -import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} +import java.util.concurrent.{CountDownLatch, ExecutionExcepti
[spark] branch master updated: [SPARK-29568][SS] Stop existing running streams when a new stream is launched
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 363af16 [SPARK-29568][SS] Stop existing running streams when a new stream is launched 363af16 is described below commit 363af16c72abe19fc5cc5b5bdf9d8dc34975f2ba Author: Burak Yavuz AuthorDate: Wed Nov 13 08:59:46 2019 -0800 [SPARK-29568][SS] Stop existing running streams when a new stream is launched ### What changes were proposed in this pull request? This PR adds a SQL Conf: `spark.sql.streaming.stopActiveRunOnRestart`. When this conf is `true` (by default it is), an already running stream will be stopped, if a new copy gets launched on the same checkpoint location. ### Why are the changes needed? In multi-tenant environments where you have multiple SparkSessions, you can accidentally start multiple copies of the same stream (i.e. streams using the same checkpoint location). This will cause all new instantiations of the new stream to fail. However, sometimes you may want to turn off the old stream, as the old stream may have turned into a zombie (you no longer have access to the query handle or SparkSession). It would be nice to have a SQL flag that allows the stopping of the old stream for such zombie cases. ### Does this PR introduce any user-facing change? Yes. Now by default, if you launch a new copy of an already running stream on a multi-tenant cluster, the existing stream will be stopped. ### How was this patch tested? Unit tests in StreamingQueryManagerSuite Closes #26225 from brkyvz/stopStream. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../org/apache/spark/sql/internal/SQLConf.scala| 9 ++ .../apache/spark/sql/internal/SharedState.scala| 10 +- .../sql/streaming/StreamingQueryManager.scala | 82 + .../sql/streaming/StreamingQueryManagerSuite.scala | 134 - .../spark/sql/streaming/StreamingQuerySuite.scala | 8 +- 5 files changed, 184 insertions(+), 59 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 98acace..759586a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1093,6 +1093,15 @@ object SQLConf { .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") .createWithDefault(2) + val STREAMING_STOP_ACTIVE_RUN_ON_RESTART = +buildConf("spark.sql.streaming.stopActiveRunOnRestart") +.doc("Running multiple runs of the same streaming query concurrently is not supported. " + + "If we find a concurrent active run for a streaming query (in the same or different " + + "SparkSessions on the same cluster) and this flag is true, we will stop the old streaming " + + "query run to start the new one.") +.booleanConf +.createWithDefault(true) + val STREAMING_JOIN_STATE_FORMAT_VERSION = buildConf("spark.sql.streaming.join.stateFormatVersion") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index d097f9f..b810bed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.net.URL import java.util.{Locale, UUID} import java.util.concurrent.ConcurrentHashMap +import javax.annotation.concurrent.GuardedBy import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -32,9 +33,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager +import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ -import org.apache.spark.sql.streaming.StreamingQueryManager +import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils @@ -112,11 +114,15 @@ private[sql] class SharedState( */ val cacheManager: CacheManager = new CacheManager + /** A global lock for all streaming query lifecycle tracking and management. */ + private[sql] val activeQueriesLock = new Object + /** * A map of active stre
[spark] branch master updated: [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cbe6ead [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState cbe6ead is described below commit cbe6eadc0c1d0384c1ee03f3a5b28cc583a60717 Author: Burak Yavuz AuthorDate: Wed Oct 23 10:56:19 2019 +0200 [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState ### What changes were proposed in this pull request? This moves the tracking of active queries from a per SparkSession state, to the shared SparkSession for better safety in isolated Spark Session environments. ### Why are the changes needed? We have checks to prevent the restarting of the same stream on the same spark session, but we can actually make that better in multi-tenant environments by actually putting that state in the SharedState instead of SessionState. This would allow a more comprehensive check for multi-tenant clusters. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added tests to StreamingQueryManagerSuite Closes #26018 from brkyvz/sharedStreamingQueryManager. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../apache/spark/sql/internal/SharedState.scala| 10 ++- .../sql/streaming/StreamingQueryManager.scala | 22 +++--- .../sql/streaming/StreamingQueryManagerSuite.scala | 80 +- 3 files changed, 102 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index f1a6481..d097f9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.internal import java.net.URL -import java.util.Locale +import java.util.{Locale, UUID} +import java.util.concurrent.ConcurrentHashMap import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils @@ -111,6 +113,12 @@ private[sql] class SharedState( val cacheManager: CacheManager = new CacheManager /** + * A map of active streaming queries to the session specific StreamingQueryManager that manages + * the lifecycle of that stream. + */ + private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]() + + /** * A status store to query SQL status/metrics of this Spark application, based on SQL-specific * [[org.apache.spark.scheduler.SparkListenerEvent]]s. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 9abe38d..9b43a83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -352,8 +352,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } } - // Make sure no other query with same id is active - if (activeQueries.values.exists(_.id == query.id)) { + // Make sure no other query with same id is active across all sessions + val activeOption = + Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) + if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) { throw new IllegalStateException( s"Cannot start query with id ${query.id} as another query with same id is " + s"already active. Perhaps you are attempting to restart a query from checkpoint " + @@ -370,9 +372,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo query.streamingQuery.start() } catch { case e: Throwable => -activeQueriesLock.synchronized { - activeQueries -= query.id -} +unregisterTerminatedStream(query.id) throw e } query @@ -380,9 +380,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo /** Notify (by the StreamingQuery) that the qu
[spark] branch master updated: [SPARK-28612][SQL] Add DataFrameWriterV2 API
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2c775f4 [SPARK-28612][SQL] Add DataFrameWriterV2 API 2c775f4 is described below commit 2c775f418f5fae4dbf3adfbb5ea99cd030918d41 Author: Ryan Blue AuthorDate: Thu Sep 19 13:32:09 2019 -0700 [SPARK-28612][SQL] Add DataFrameWriterV2 API ## What changes were proposed in this pull request? This adds a new write API as proposed in the [SPIP to standardize logical plans](https://issues.apache.org/jira/browse/SPARK-23521). This new API: * Uses clear verbs to execute writes, like `append`, `overwrite`, `create`, and `replace` that correspond to the new logical plans. * Only creates v2 logical plans so the behavior is always consistent. * Does not allow table configuration options for operations that cannot change table configuration. For example, `partitionedBy` can only be called when the writer executes `create` or `replace`. Here are a few example uses of the new API: ```scala df.writeTo("catalog.db.table").append() df.writeTo("catalog.db.table").overwrite($"date" === "2019-06-01") df.writeTo("catalog.db.table").overwritePartitions() df.writeTo("catalog.db.table").asParquet.create() df.writeTo("catalog.db.table").partitionedBy(days($"ts")).createOrReplace() df.writeTo("catalog.db.table").using("abc").replace() ``` ## How was this patch tested? Added `DataFrameWriterV2Suite` that tests the new write API. Existing tests for v2 plans. Closes #25681 from rdblue/SPARK-28612-add-data-frame-writer-v2. Authored-by: Ryan Blue Signed-off-by: Burak Yavuz --- .../catalyst/expressions/PartitionTransforms.scala | 77 .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +- .../plans/logical/basicLogicalOperators.scala | 47 +- .../datasources/v2/DataSourceV2Implicits.scala | 9 + .../apache/spark/sql/connector/InMemoryTable.scala | 5 +- .../org/apache/spark/sql/DataFrameWriter.scala | 11 +- .../org/apache/spark/sql/DataFrameWriterV2.scala | 367 +++ .../main/scala/org/apache/spark/sql/Dataset.scala | 28 ++ .../datasources/v2/DataSourceV2Strategy.scala | 17 +- .../datasources/v2/TableCapabilityCheck.scala | 6 +- .../scala/org/apache/spark/sql/functions.scala | 58 +++ .../spark/sql/JavaDataFrameWriterV2Suite.java | 112 + .../apache/spark/sql/DataFrameWriterV2Suite.scala | 507 + 13 files changed, 1217 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala new file mode 100644 index 000..e48fd8a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.types.{DataType, IntegerType} + +/** + * Base class for expressions that are converted to v2 partition transforms. + * + * Subclasses represent abstract transform functions with concrete implementations that are + * determined by data source implementations. Because the concrete implementation is not known, + * these expressions are [[Unevaluable]]. + * + * These expressions are used to pass transformations from the DataFrame API: + * + * {{{ + * df.writeTo("catalog.db.table").partitionedBy($"category", days($"timestamp")).create() + * }}} + */ +abstract class PartitionTransformExpression extends Expression with Unevaluable { + override def nullable: Boolean = true +} + +/** + * Expression for the v2 partition transform years. + */ +case class Years(child: Expression) extends PartitionTransformExpression {
[spark] branch master updated: [SPARK-29030][SQL] Simplify lookupV2Relation
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ee94b5d [SPARK-29030][SQL] Simplify lookupV2Relation ee94b5d is described below commit ee94b5d7019f8ec181d42e953cb8b5190186fe30 Author: John Zhuge AuthorDate: Wed Sep 18 09:27:11 2019 -0700 [SPARK-29030][SQL] Simplify lookupV2Relation ## What changes were proposed in this pull request? Simplify the return type for `lookupV2Relation` which makes the 3 callers more straightforward. ## How was this patch tested? Existing unit tests. Closes #25735 from jzhuge/lookupv2relation. Authored-by: John Zhuge Signed-off-by: Burak Yavuz --- .../spark/sql/catalyst/analysis/Analyzer.scala | 87 -- .../sql/connector/catalog/CatalogV2Implicits.scala | 6 +- 2 files changed, 37 insertions(+), 56 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0a13a34..76e59fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.sql._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf @@ -666,20 +666,13 @@ class Analyzer( object ResolveTables extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case u: UnresolvedRelation => -val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match { - case scala.Left((_, _, tableOpt)) => tableOpt - case scala.Right(tableOpt) => tableOpt -} -v2TableOpt.map(DataSourceV2Relation.create).getOrElse(u) +lookupV2Relation(u.multipartIdentifier) + .getOrElse(u) case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved => -val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match { - case scala.Left((_, _, tableOpt)) => tableOpt - case scala.Right(tableOpt) => tableOpt -} -v2TableOpt.map(DataSourceV2Relation.create).map { v2Relation => - i.copy(table = v2Relation) -}.getOrElse(i) +lookupV2Relation(u.multipartIdentifier) + .map(v2Relation => i.copy(table = v2Relation)) + .getOrElse(i) } } @@ -963,26 +956,13 @@ class Analyzer( private def resolveV2Alter( tableName: Seq[String], changes: Seq[TableChange]): Option[AlterTable] = { - lookupV2Relation(tableName) match { -case scala.Left((v2Catalog, ident, tableOpt)) => - Some(AlterTable( -v2Catalog.asTableCatalog, -ident, - tableOpt.map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(tableName)), -changes - )) -case scala.Right(tableOpt) => - tableOpt.map { table => -AlterTable( - sessionCatalog.asTableCatalog, - Identifier.of(tableName.init.toArray, tableName.last), - DataSourceV2Relation.create(table), - changes -) - } + lookupV2RelationAndCatalog(tableName).map { +case (relation, catalog, ident) => + AlterTable(catalog.asTableCatalog, ident, relation, changes) } } } + /** * Resolve DESCRIBE TABLE statements that use a DSv2 catalog. * @@ -2840,36 +2820,35 @@ class Analyzer( /** * Performs the lookup of DataSourceV2 Tables. The order of resolution is: - * 1. Check if this relation is a temporary table - * 2. Check if it has a catalog identifier. Here we try to load the table. If we find the table, - * we can return the table. The result returned by an explicit catalog will be returned on - * the Left projection of the Either. - * 3. Try resolving the relation using the V2SessionCatalog if that is defined. If the - * V2SessionCatalog returns a V1 table definitio
[spark] branch master updated: [SPARK-28628][SQL] Implement SupportsNamespaces in V2SessionCatalog
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5ea134c [SPARK-28628][SQL] Implement SupportsNamespaces in V2SessionCatalog 5ea134c is described below commit 5ea134c3546aa0512a85cc2970d38f5e0345edde Author: Ryan Blue AuthorDate: Tue Sep 3 13:13:27 2019 -0700 [SPARK-28628][SQL] Implement SupportsNamespaces in V2SessionCatalog ## What changes were proposed in this pull request? This adds namespace support to V2SessionCatalog. ## How was this patch tested? WIP: will add tests for v2 session catalog namespace methods. Closes #25363 from rdblue/SPARK-28628-support-namespaces-in-v2-session-catalog. Authored-by: Ryan Blue Signed-off-by: Burak Yavuz --- .../datasources/v2/V2SessionCatalog.scala | 136 +++- .../datasources/v2/V2SessionCatalogSuite.scala | 347 +++-- 2 files changed, 451 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 6dcebe295..6f8cf47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -17,18 +17,20 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.net.URI import java.util import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange} -import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform} +import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty} +import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.sources.v2.Table @@ -39,11 +41,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. */ -class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { +class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + import V2SessionCatalog._ + def this() = { this(SparkSession.active.sessionState) } + override val defaultNamespace: Array[String] = Array("default") + private lazy val catalog: SessionCatalog = sessionState.catalog private var _name: String = _ @@ -87,7 +94,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName) val tableProperties = properties.asScala -val location = Option(properties.get("location")) +val location = Option(properties.get(LOCATION_TABLE_PROP)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) .copy(locationUri = location.map(CatalogUtils.stringToURI)) val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED @@ -102,7 +109,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions, - comment = Option(properties.get("comment"))) + comment = Option(properties.get(COMMENT_TABLE_PROP))) try { catalog.createTable(tableDesc, ig
[spark] branch master updated: [SPARK-28612][SQL] Add DataFrameWriterV2 API
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3821d75 [SPARK-28612][SQL] Add DataFrameWriterV2 API 3821d75 is described below commit 3821d75b836afae55a2a92c14b379bf4ec8a5362 Author: Ryan Blue AuthorDate: Sat Aug 31 21:28:20 2019 -0700 [SPARK-28612][SQL] Add DataFrameWriterV2 API ## What changes were proposed in this pull request? This adds a new write API as proposed in the [SPIP to standardize logical plans](https://issues.apache.org/jira/browse/SPARK-23521). This new API: * Uses clear verbs to execute writes, like `append`, `overwrite`, `create`, and `replace` that correspond to the new logical plans. * Only creates v2 logical plans so the behavior is always consistent. * Does not allow table configuration options for operations that cannot change table configuration. For example, `partitionedBy` can only be called when the writer executes `create` or `replace`. Here are a few example uses of the new API: ```scala df.writeTo("catalog.db.table").append() df.writeTo("catalog.db.table").overwrite($"date" === "2019-06-01") df.writeTo("catalog.db.table").overwritePartitions() df.writeTo("catalog.db.table").asParquet.create() df.writeTo("catalog.db.table").partitionedBy(days($"ts")).createOrReplace() df.writeTo("catalog.db.table").using("abc").replace() ``` ## How was this patch tested? Added `DataFrameWriterV2Suite` that tests the new write API. Existing tests for v2 plans. Closes #25354 from rdblue/SPARK-28612-add-data-frame-writer-v2. Authored-by: Ryan Blue Signed-off-by: Burak Yavuz --- .../catalyst/expressions/PartitionTransforms.scala | 77 .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +- .../plans/logical/basicLogicalOperators.scala | 47 +- .../datasources/v2/DataSourceV2Implicits.scala | 9 + .../apache/spark/sql/connector/InMemoryTable.scala | 5 +- .../org/apache/spark/sql/DataFrameWriter.scala | 11 +- .../org/apache/spark/sql/DataFrameWriterV2.scala | 365 +++ .../main/scala/org/apache/spark/sql/Dataset.scala | 28 ++ .../datasources/v2/DataSourceV2Strategy.scala | 20 +- .../datasources/v2/V2WriteSupportCheck.scala | 6 +- .../scala/org/apache/spark/sql/functions.scala | 64 +++ .../sql/sources/v2/DataFrameWriterV2Suite.scala| 508 + 12 files changed, 1110 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala new file mode 100644 index 000..e48fd8a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.types.{DataType, IntegerType} + +/** + * Base class for expressions that are converted to v2 partition transforms. + * + * Subclasses represent abstract transform functions with concrete implementations that are + * determined by data source implementations. Because the concrete implementation is not known, + * these expressions are [[Unevaluable]]. + * + * These expressions are used to pass transformations from the DataFrame API: + * + * {{{ + * df.writeTo("catalog.db.table").partitionedBy($"category", days($"timestamp")).create() + * }}} + */ +abstract class PartitionTransformExpression extends Expression with Unevaluable { + override def nullable: Boolean = true +} + +/** + * Expression for the v2 partition transform years. + */ +case class Years(child: Expression) extends PartitionTransformExpression { + override def dataType: DataType = IntegerType + overrid
[spark] branch master updated: [SPARK-28635][SQL][FOLLOWUP] CatalogManager should reflect the changes of default catalog
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 97b046f [SPARK-28635][SQL][FOLLOWUP] CatalogManager should reflect the changes of default catalog 97b046f is described below commit 97b046f06fba6a5c555310ac41f46f5fdfbdc5d4 Author: Wenchen Fan AuthorDate: Wed Aug 21 12:23:42 2019 -0700 [SPARK-28635][SQL][FOLLOWUP] CatalogManager should reflect the changes of default catalog ### What changes were proposed in this pull request? The current namespace/catalog should be set to None at the beginning, so that we can read the new configs when reporting currennt namespace/catalog later. ### Why are the changes needed? Fix a bug in CatalogManager, to reflect the change of default catalog config when reporting current catalog. ### Does this PR introduce any user-facing change? No. The current namespace/catalog stuff is still internal right now. ### How was this patch tested? a new test suite Closes #25521 from cloud-fan/fix. Authored-by: Wenchen Fan Signed-off-by: Burak Yavuz --- .../spark/sql/catalog/v2/CatalogManager.scala | 23 +++--- .../sql/catalyst/catalog/CatalogManagerSuite.scala | 94 ++ 2 files changed, 106 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala index c91a73a..d5a6a61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala @@ -62,36 +62,37 @@ class CatalogManager(conf: SQLConf) extends Logging { case _ => Array.empty[String] } - private var _currentNamespace = { -// The builtin catalog use "default" as the default database. -defaultCatalog.map(getDefaultNamespace).getOrElse(Array("default")) - } + private var _currentNamespace: Option[Array[String]] = None def currentNamespace: Array[String] = synchronized { -_currentNamespace +_currentNamespace.getOrElse { + currentCatalog.map { catalogName => +getDefaultNamespace(catalog(catalogName)) + }.getOrElse(Array("default")) // The builtin catalog use "default" as the default database. +} } def setCurrentNamespace(namespace: Array[String]): Unit = synchronized { -_currentNamespace = namespace +_currentNamespace = Some(namespace) } - private var _currentCatalog = conf.defaultV2Catalog + private var _currentCatalog: Option[String] = None // Returns the name of current catalog. None means the current catalog is the builtin catalog. def currentCatalog: Option[String] = synchronized { -_currentCatalog +_currentCatalog.orElse(conf.defaultV2Catalog) } def setCurrentCatalog(catalogName: String): Unit = synchronized { _currentCatalog = Some(catalogName) -_currentNamespace = getDefaultNamespace(catalog(catalogName)) +_currentNamespace = None } // Clear all the registered catalogs. Only used in tests. private[sql] def reset(): Unit = synchronized { catalogs.clear() -_currentNamespace = defaultCatalog.map(getDefaultNamespace).getOrElse(Array("default")) -_currentCatalog = conf.defaultV2Catalog +_currentNamespace = None +_currentCatalog = None } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala new file mode 100644 index 000..f7f1901 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import java.util + +import org.apache
[spark] branch master updated: [SPARK-28565][SQL] DataFrameWriter saveAsTable support for V2 catalogs
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5368eaa [SPARK-28565][SQL] DataFrameWriter saveAsTable support for V2 catalogs 5368eaa is described below commit 5368eaa2fc33f6d50b482bd1e90e0437b1887cd2 Author: Burak Yavuz AuthorDate: Thu Aug 8 22:30:00 2019 -0700 [SPARK-28565][SQL] DataFrameWriter saveAsTable support for V2 catalogs ## What changes were proposed in this pull request? Adds support for V2 catalogs and the V2SessionCatalog for V2 tables for saveAsTable. If the table can resolve through the V2SessionCatalog, we use SaveMode for datasource v1 for backwards compatibility to select the code path we're going to hit. Depending on the SaveMode: - SaveMode.Append: a) If table exists: Use AppendData.byName b) If table doesn't exist, use CTAS (ignoreIfExists = false) - SaveMode.Overwrite: Use RTAS (orCreate = true) - SaveMode.Ignore: Use CTAS (ignoreIfExists = true) - SaveMode.ErrorIfExists: Use CTAS (ignoreIfExists = false) ## How was this patch tested? Unit tests in DataSourceV2DataFrameSuite Closes #25330 from brkyvz/saveAsTable. Lead-authored-by: Burak Yavuz Co-authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../org/apache/spark/sql/DataFrameWriter.scala | 81 -- .../sources/v2/DataSourceV2DataFrameSuite.scala| 64 - 2 files changed, 138 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ae82670..af7ddd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -22,22 +22,22 @@ import java.util.{Locale, Properties, UUID} import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier} +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} +import org.apache.spark.sql.catalog.v2.expressions._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} -import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -360,6 +360,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def insertInto(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} +import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ assertNotBucketed("insertInto") @@ -374,8 +375,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => insertInto(catalog, ident) + // TODO(SPARK-28667): Support the V2SessionCatalog case AsTableIdentifier(tableIdentifier) => insertInto(tableIdentifier) + case other => +throw new AnalysisException( + s"Couldn't find a catalog to handle the identifier ${other.quoted}.") } } @@ -485,7 +490,71 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def saveAsTable(tableName: String): Unit = { - saveAsTable(df.sparkSession.sessionState.sqlParser.parseTableIdent
[spark] branch master updated: [SPARK-28331][SQL] Catalogs.load() should be able to load built-in catalogs
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c88df2c [SPARK-28331][SQL] Catalogs.load() should be able to load built-in catalogs c88df2c is described below commit c88df2ccf670db62aed6565c9dbdb58d5d5cca3f Author: Gengliang Wang AuthorDate: Wed Aug 7 16:14:34 2019 -0700 [SPARK-28331][SQL] Catalogs.load() should be able to load built-in catalogs ## What changes were proposed in this pull request? In `Catalogs.load`, the `pluginClassName` in the following code ``` String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); ``` is always null for built-in catalogs, e.g there is a SQLConf entry `spark.sql.catalog.session`. This is because of https://github.com/apache/spark/pull/18852: SQLConf.conf.getConfString(key, null) always returns null. ## How was this patch tested? Apply code changes of https://github.com/apache/spark/pull/24768 and tried loading session catalog. Closes #25094 from gengliangwang/fixCatalogLoad. Authored-by: Gengliang Wang Signed-off-by: Burak Yavuz --- .../src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index 7511d94..f471a4e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -26,6 +26,7 @@ import org.apache.spark.util.Utils; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; +import java.util.NoSuchElementException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -50,8 +51,10 @@ public class Catalogs { */ public static CatalogPlugin load(String name, SQLConf conf) throws CatalogNotFoundException, SparkException { -String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); -if (pluginClassName == null) { +String pluginClassName; +try { + pluginClassName = conf.getConfString("spark.sql.catalog." + name); +} catch (NoSuchElementException e){ throw new CatalogNotFoundException(String.format( "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27661][SQL] Add SupportsNamespaces API
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0345f11 [SPARK-27661][SQL] Add SupportsNamespaces API 0345f11 is described below commit 0345f1174d6019374ed5451140e01c224508bc0e Author: Ryan Blue AuthorDate: Sun Aug 4 21:29:40 2019 -0700 [SPARK-27661][SQL] Add SupportsNamespaces API ## What changes were proposed in this pull request? This adds an interface for catalog plugins that exposes namespace operations: * `listNamespaces` * `namespaceExists` * `loadNamespaceMetadata` * `createNamespace` * `alterNamespace` * `dropNamespace` ## How was this patch tested? API only. Existing tests for regressions. Closes #24560 from rdblue/SPARK-27661-add-catalog-namespace-api. Authored-by: Ryan Blue Signed-off-by: Burak Yavuz --- .../spark/sql/catalog/v2/NamespaceChange.java | 97 ++ .../spark/sql/catalog/v2/SupportsNamespaces.java | 145 +++ .../spark/sql/catalog/v2/utils/CatalogV2Util.scala | 33 +++- .../spark/sql/catalog/v2/TableCatalogSuite.scala | 204 - .../spark/sql/catalog/v2/TestTableCatalog.scala| 70 ++- 5 files changed, 543 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java new file mode 100644 index 000..6f5895b --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * NamespaceChange subclasses represent requested changes to a namespace. These are passed to + * {@link SupportsNamespaces#alterNamespace}. For example, + * + * import NamespaceChange._ + * val catalog = Catalogs.load(name) + * catalog.alterNamespace(ident, + * setProperty("prop", "value"), + * removeProperty("other_prop") + * ) + * + */ +public interface NamespaceChange { + /** + * Create a NamespaceChange for setting a namespace property. + * + * If the property already exists, it will be replaced with the new value. + * + * @param property the property name + * @param value the new property value + * @return a NamespaceChange for the addition + */ + static NamespaceChange setProperty(String property, String value) { +return new SetProperty(property, value); + } + + /** + * Create a NamespaceChange for removing a namespace property. + * + * If the property does not exist, the change will succeed. + * + * @param property the property name + * @return a NamespaceChange for the addition + */ + static NamespaceChange removeProperty(String property) { +return new RemoveProperty(property); + } + + /** + * A NamespaceChange to set a namespace property. + * + * If the property already exists, it must be replaced with the new value. + */ + final class SetProperty implements NamespaceChange { +private final String property; +private final String value; + +private SetProperty(String property, String value) { + this.property = property; + this.value = value; +} + +public String property() { + return property; +} + +public String value() { + return value; +} + } + + /** + * A NamespaceChange to remove a namespace property. + * + * If the property does not exist, the change should succeed. + */ + final class RemoveProperty implements NamespaceChange { +private final String property; + +private RemoveProperty(String property) { + this.property = property; +} + +public String property() { + return property; +} + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java new file mode
[spark] branch master updated: [SPARK-27661][SQL] Add SupportsNamespaces API
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0f89a5d [SPARK-27661][SQL] Add SupportsNamespaces API 0f89a5d is described below commit 0f89a5dcca86c9d5eff9376855115e6d8838254c Author: Ryan Blue AuthorDate: Sun Aug 4 21:23:42 2019 -0700 [SPARK-27661][SQL] Add SupportsNamespaces API ## What changes were proposed in this pull request? This adds an interface for catalog plugins that exposes namespace operations: * `listNamespaces` * `namespaceExists` * `loadNamespaceMetadata` * `createNamespace` * `alterNamespace` * `dropNamespace` ## How was this patch tested? API only. Existing tests for regressions. Closes #24560 from rdblue/SPARK-27661-add-catalog-namespace-api. Authored-by: Ryan Blue Signed-off-by: Burak Yavuz --- .../spark/sql/catalog/v2/NamespaceChange.java | 97 ++ .../spark/sql/catalog/v2/SupportsNamespaces.java | 145 +++ .../spark/sql/catalog/v2/utils/CatalogV2Util.scala | 33 +++- .../spark/sql/catalog/v2/TableCatalogSuite.scala | 204 - .../spark/sql/catalog/v2/TestTableCatalog.scala| 70 ++- 5 files changed, 543 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java new file mode 100644 index 000..6f5895b --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * NamespaceChange subclasses represent requested changes to a namespace. These are passed to + * {@link SupportsNamespaces#alterNamespace}. For example, + * + * import NamespaceChange._ + * val catalog = Catalogs.load(name) + * catalog.alterNamespace(ident, + * setProperty("prop", "value"), + * removeProperty("other_prop") + * ) + * + */ +public interface NamespaceChange { + /** + * Create a NamespaceChange for setting a namespace property. + * + * If the property already exists, it will be replaced with the new value. + * + * @param property the property name + * @param value the new property value + * @return a NamespaceChange for the addition + */ + static NamespaceChange setProperty(String property, String value) { +return new SetProperty(property, value); + } + + /** + * Create a NamespaceChange for removing a namespace property. + * + * If the property does not exist, the change will succeed. + * + * @param property the property name + * @return a NamespaceChange for the addition + */ + static NamespaceChange removeProperty(String property) { +return new RemoveProperty(property); + } + + /** + * A NamespaceChange to set a namespace property. + * + * If the property already exists, it must be replaced with the new value. + */ + final class SetProperty implements NamespaceChange { +private final String property; +private final String value; + +private SetProperty(String property, String value) { + this.property = property; + this.value = value; +} + +public String property() { + return property; +} + +public String value() { + return value; +} + } + + /** + * A NamespaceChange to remove a namespace property. + * + * If the property does not exist, the change should succeed. + */ + final class RemoveProperty implements NamespaceChange { +private final String property; + +private RemoveProperty(String property) { + this.property = property; +} + +public String property() { + return property; +} + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java new file mode
[spark] branch master updated: [SPARK-27845][SQL] DataSourceV2: InsertTable
This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 443904a [SPARK-27845][SQL] DataSourceV2: InsertTable 443904a is described below commit 443904a14044ff32421e577dc26d0d53112ceaba Author: Ryan Blue AuthorDate: Thu Jul 25 15:05:51 2019 -0700 [SPARK-27845][SQL] DataSourceV2: InsertTable ## What changes were proposed in this pull request? Support multiple catalogs in the following InsertTable use cases: - INSERT INTO [TABLE] catalog.db.tbl - INSERT OVERWRITE TABLE catalog.db.tbl Support matrix: Overwrite|Partitioned Table|Partition Clause |Partition Overwrite Mode|Action -|-|-||- false|*|*|*|AppendData true|no|(empty)|*|OverwriteByExpression(true) true|yes|p1,p2 or p1 or p2 or (empty)|STATIC|OverwriteByExpression(true) true|yes|p2,p2 or p1 or p2 or (empty)|DYNAMIC|OverwritePartitionsDynamic true|yes|p1=23,p2=3|*|OverwriteByExpression(p1=23 and p2=3) true|yes|p1=23,p2 or p1=23|STATIC|OverwriteByExpression(p1=23) true|yes|p1=23,p2 or p1=23|DYNAMIC|OverwritePartitionsDynamic Notes: - Assume the partitioned table has 2 partitions: p1 and p2. - `STATIC` is the default Partition Overwrite Mode for data source tables. - DSv2 tables currently do not support `IfPartitionNotExists`. ## How was this patch tested? New tests. All existing catalyst and sql/core tests. Closes #24832 from jzhuge/SPARK-27845-pr. Lead-authored-by: Ryan Blue Co-authored-by: John Zhuge Signed-off-by: Burak Yavuz --- .../apache/spark/sql/catalyst/parser/SqlBase.g4| 4 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 137 +- .../apache/spark/sql/catalyst/dsl/package.scala| 13 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 36 ++- .../plans/logical/sql/InsertIntoStatement.scala| 50 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 113 +++- .../sql/catalyst/parser/PlanParserSuite.scala | 19 +- .../datasources/DataSourceResolution.scala | 1 + .../sql/sources/v2/DataSourceV2SQLSuite.scala | 301 - .../sql/sources/v2/TestInMemoryTableCatalog.scala | 137 +++--- .../org/apache/spark/sql/hive/InsertSuite.scala| 3 +- 11 files changed, 738 insertions(+), 76 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 0a142c2..517ef9d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -294,8 +294,8 @@ query ; insertInto -: INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable -| INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable +: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable +| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? #insertIntoTable | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat?#insertOverwriteHiveDir | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e55cdfe..021fb26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -25,6 +25,8 @@ import scala.util.Random import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform} +import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util.loadTable import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -34,12 +36,14 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql
spark git commit: [SPARK-25472][SS] Don't have legitimate stops of streams cause stream exceptions
Repository: spark Updated Branches: refs/heads/master 4d114fc9a -> 77e52448e [SPARK-25472][SS] Don't have legitimate stops of streams cause stream exceptions ## What changes were proposed in this pull request? Legitimate stops of streams may actually cause an exception to be captured by stream execution, because the job throws a SparkException regarding job cancellation during a stop. This PR makes the stop more graceful by swallowing this cancellation error. ## How was this patch tested? This is pretty hard to test. The existing tests should make sure that we're not swallowing other specific SparkExceptions. I've also run the `KafkaSourceStressForDontFailOnDataLossSuite`100 times, and it didn't fail, whereas it used to be flaky. Closes #22478 from brkyvz/SPARK-25472. Authored-by: Burak Yavuz Signed-off-by: Burak Yavuz Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77e52448 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77e52448 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77e52448 Branch: refs/heads/master Commit: 77e52448e7f94aadfa852cc67084415de6ecfa7c Parents: 4d114fc Author: Burak Yavuz Authored: Thu Sep 20 15:46:33 2018 -0700 Committer: Burak Yavuz Committed: Thu Sep 20 15:46:33 2018 -0700 -- .../execution/streaming/StreamExecution.scala | 22 +++- .../continuous/ContinuousExecution.scala| 4 ++-- .../WriteToContinuousDataSourceExec.scala | 2 +- 3 files changed, 20 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/77e52448/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f6c60c1..631a6eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.hadoop.fs.Path +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -282,7 +283,7 @@ abstract class StreamExecution( // `stop()` is already called. Let `finally` finish the cleanup. } } catch { - case e if isInterruptedByStop(e) => + case e if isInterruptedByStop(e, sparkSession.sparkContext) => // interrupted by stop() updateStatusMessage("Stopped") case e: IOException if e.getMessage != null @@ -354,9 +355,9 @@ abstract class StreamExecution( } } - private def isInterruptedByStop(e: Throwable): Boolean = { + private def isInterruptedByStop(e: Throwable, sc: SparkContext): Boolean = { if (state.get == TERMINATED) { - StreamExecution.isInterruptionException(e) + StreamExecution.isInterruptionException(e, sc) } else { false } @@ -531,7 +532,7 @@ object StreamExecution { val QUERY_ID_KEY = "sql.streaming.queryId" val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing" - def isInterruptionException(e: Throwable): Boolean = e match { + def isInterruptionException(e: Throwable, sc: SparkContext): Boolean = e match { // InterruptedIOException - thrown when an I/O operation is interrupted // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException => @@ -546,7 +547,18 @@ object StreamExecution { // ExecutionException, such as BiFunction.apply case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException) if e2.getCause != null => - isInterruptionException(e2.getCause) + isInterruptionException(e2.getCause, sc) +case se: SparkException => + val jobGroup = sc.getLocalProperty("spark.jobGroup.id") + if (jobGroup == null) return false + val errorMsg = se.getMessage + if (errorMsg.contains("cancelled") && errorMsg.contains(jobGroup) && se.getCause == null) { +true + } else if (se.getCause != null) { +isInterruptionException(se.getCause, sc) + } else { +false + } case _ => false } http://git-wip-us.apache.org/repos/asf/spark/bl
spark git commit: [SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink
Repository: spark Updated Branches: refs/heads/master 90da7dc24 -> e4fee395e [SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink ## What changes were proposed in this pull request? Provide an option to limit number of rows in a MemorySink. Currently, MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used on big data, they can OOM the stream. This change adds a maxMemorySinkRows option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, they are still unbounded. ## How was this patch tested? Added new unit tests. Author: Mukul Murthy Closes #21559 from mukulmurthy/SPARK-24525. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4fee395 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4fee395 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4fee395 Branch: refs/heads/master Commit: e4fee395ecd93ad4579d9afbf0861f82a303e563 Parents: 90da7dc Author: Mukul Murthy Authored: Fri Jun 15 13:56:48 2018 -0700 Committer: Burak Yavuz Committed: Fri Jun 15 13:56:48 2018 -0700 -- .../spark/sql/execution/streaming/memory.scala | 70 +++-- .../execution/streaming/sources/memoryV2.scala | 44 --- .../spark/sql/streaming/DataStreamWriter.scala | 4 +- .../execution/streaming/MemorySinkSuite.scala | 62 ++- .../execution/streaming/MemorySinkV2Suite.scala | 80 +++- .../apache/spark/sql/streaming/StreamTest.scala | 4 +- 6 files changed, 239 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e4fee395/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index b137f98..7fa13c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode @@ -221,19 +222,60 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow]) } /** A common trait for MemorySinks with methods used for testing */ -trait MemorySinkBase extends BaseStreamingSink { +trait MemorySinkBase extends BaseStreamingSink with Logging { def allData: Seq[Row] def latestBatchData: Seq[Row] def dataSinceBatch(sinceBatchId: Long): Seq[Row] def latestBatchId: Option[Long] + + /** + * Truncates the given rows to return at most maxRows rows. + * @param rows The data that may need to be truncated. + * @param batchLimit Number of rows to keep in this batch; the rest will be truncated + * @param sinkLimit Total number of rows kept in this sink, for logging purposes. + * @param batchId The ID of the batch that sent these rows, for logging purposes. + * @return Truncated rows. + */ + protected def truncateRowsIfNeeded( + rows: Array[Row], + batchLimit: Int, + sinkLimit: Int, + batchId: Long): Array[Row] = { +if (rows.length > batchLimit && batchLimit >= 0) { + logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit") + rows.take(batchLimit) +} else { + rows +} + } +} + +/** + * Companion object to MemorySinkBase. + */ +object MemorySinkBase { + val MAX_MEMORY_SINK_ROWS = "maxRows" + val MAX_MEMORY_SINK_ROWS_DEFAULT = -1 + + /** + * Gets the max number of rows a MemorySink should store. This number is based on the memory + * sink row limit option if it is set. If not, we use a large value so that data truncates + * rather than causing out of memory errors. + * @param options Options for writing from which we get the max rows option + * @return The maximum number of rows a memorySink should store. + */ + def getMemorySinkCapacity(options: DataSourceOptions): Int = { +val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT) +if (maxRows >= 0) maxRows else Int.MaxValue - 10 + } } /** * A sink that
spark git commit: [SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific timestamp
Repository: spark Updated Branches: refs/heads/master be03d3ad7 -> 0e6833006 [SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific timestamp ## What changes were proposed in this pull request? Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp. The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp. ## How was this patch tested? Unit Tests cc : budde brkyvz Author: Yash Sharma <ysha...@atlassian.com> Closes #18029 from yssharma/ysharma/kcl_resume. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e683300 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e683300 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e683300 Branch: refs/heads/master Commit: 0e6833006d28df426eb132bb8fc82917b8e2aedd Parents: be03d3a Author: Yash Sharma <ysha...@atlassian.com> Authored: Tue Dec 26 09:50:39 2017 +0200 Committer: Burak Yavuz <brk...@gmail.com> Committed: Tue Dec 26 09:50:39 2017 +0200 -- .../kinesis/KinesisInitialPositions.java| 91 .../streaming/KinesisWordCountASL.scala | 5 +- .../streaming/kinesis/KinesisInputDStream.scala | 31 +-- .../streaming/kinesis/KinesisReceiver.scala | 45 ++ .../spark/streaming/kinesis/KinesisUtils.scala | 15 ++-- .../JavaKinesisInputDStreamBuilderSuite.java| 47 -- .../KinesisInputDStreamBuilderSuite.scala | 68 +-- .../streaming/kinesis/KinesisStreamSuite.scala | 11 +-- 8 files changed, 264 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java -- diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java new file mode 100644 index 000..206e1e4 --- /dev/null +++ b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + +import java.io.Serializable; +import java.util.Date; + +/** + * A java wrapper for exposing [[InitialPositionInStream]] + * to the corresponding Kinesis readers. + */ +interface KinesisInitialPosition { +InitialPositionInStream getPosition(); +} + +public class KinesisInitialPositions { +public static class Latest implements KinesisInitialPosition, Serializable { +public Latest() {} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.LATEST; +} +} + +public static class TrimHorizon implements KinesisInitialPosition, Serializable { +public TrimHorizon() {} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.TRIM_HORIZON; +} +} + +public static class AtTimestamp implements KinesisInitialPosition, Serializable { +private Date timestamp; + +public AtTimestamp(Date timestamp) { +this.timestamp = timestamp; +} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.AT_TIMESTAMP; +} + +public Date getTimestamp() { +return timestamp; +} +} + + +/** + * Returns instance of [[KinesisInitialPosition]] based
spark git commit: [SPARK-21977] SinglePartition optimizations break certain Streaming Stateful Aggregation requirements
Repository: spark Updated Branches: refs/heads/master c6ff59a23 -> 280ff523f [SPARK-21977] SinglePartition optimizations break certain Streaming Stateful Aggregation requirements ## What changes were proposed in this pull request? This is a bit hard to explain as there are several issues here, I'll try my best. Here are the requirements: 1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions 2. A StructuredStreaming query that uses the above source, performs a stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's by 1 The crux of the problem is that when a dataset has a `coalesce(1)` call, it receives a `SinglePartition` partitioning scheme. This scheme satisfies most required distributions used for aggregations such as HashAggregateExec. This causes a world of problems: Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 partitions, nothing will be executed, the state store will not create any delta files. When this happens, the next trigger fails, because the StateStore fails to load the delta file for the previous trigger Symptom 2. Let's say that there was data. Then in this case, if you stop your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` number of StateStores will fail to find its delta files. To fix the issues above, we must check that the partitioning of the child of a `StatefulOperator` satisfies: If the grouping expressions are empty: a) AllTuple distribution b) Single physical partition If the grouping expressions are non empty: a) Clustered distribution b) spark.sql.shuffle.partition # of partitions whether or not `coalesce(1)` exists in the plan, and whether or not the input RDD for the trigger has any data. Once you fix the above problem by adding an Exchange to the plan, you come across the following bug: If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the prior state. However, for this specific aggregation, `HashAggregateExec` after the restore returns a (0, 0) row, since we're performing a count, and there is no data. Then this data gets stored in `StateStoreSaveExec` causing the previous counts to be overwritten and lost. ## How was this patch tested? Regression tests Author: Burak Yavuz <brk...@gmail.com> Closes #19196 from brkyvz/sa-0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/280ff523 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/280ff523 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/280ff523 Branch: refs/heads/master Commit: 280ff523f4079dd9541efc95e6efcb69f9374d22 Parents: c6ff59a Author: Burak Yavuz <brk...@gmail.com> Authored: Wed Sep 20 00:01:21 2017 -0700 Committer: Burak Yavuz <brk...@gmail.com> Committed: Wed Sep 20 00:01:21 2017 -0700 -- .../streaming/IncrementalExecution.scala| 34 +++- .../execution/streaming/StreamExecution.scala | 1 + .../execution/streaming/statefulOperators.scala | 37 +++- .../EnsureStatefulOpPartitioningSuite.scala | 132 + .../apache/spark/sql/streaming/StreamTest.scala | 16 +- .../streaming/StreamingAggregationSuite.scala | 196 ++- 6 files changed, 395 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/280ff523/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 19d9598..027222e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -21,11 +21,13 @@ import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} +import org.apache.spark.sql.execution.exc
spark git commit: [SPARK-21463] Allow userSpecifiedSchema to override partition inference performed by MetadataLogFileIndex
Repository: spark Updated Branches: refs/heads/master 8cd9cdf17 -> 2c9d5ef1f [SPARK-21463] Allow userSpecifiedSchema to override partition inference performed by MetadataLogFileIndex ## What changes were proposed in this pull request? When using the MetadataLogFileIndex to read back a table, we don't respect the user provided schema as the proper column types. This can lead to issues when trying to read strings that look like dates that get truncated to DateType, or longs being truncated to IntegerType, just because a long value doesn't exist. ## How was this patch tested? Unit tests and manual tests Author: Burak Yavuz <brk...@gmail.com> Closes #18676 from brkyvz/stream-partitioning. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c9d5ef1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c9d5ef1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c9d5ef1 Branch: refs/heads/master Commit: 2c9d5ef1f0c30713dafbf8ef0eb69d5520f7dcaf Parents: 8cd9cdf Author: Burak Yavuz <brk...@gmail.com> Authored: Wed Jul 19 15:56:26 2017 -0700 Committer: Burak Yavuz <brk...@gmail.com> Committed: Wed Jul 19 15:56:26 2017 -0700 -- .../sql/execution/datasources/DataSource.scala | 33 +++- .../execution/streaming/FileStreamSource.scala | 2 +- .../streaming/MetadataLogFileIndex.scala| 11 +-- .../ParquetPartitionDiscoverySuite.scala| 33 4 files changed, 69 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c9d5ef1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d36a04f..cbe8ce4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -97,6 +97,24 @@ case class DataSource( } /** + * In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or + * inconsistent data types as reported in SPARK-21463. + * @param fileIndex A FileIndex that will perform partition inference + * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema` + */ + private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = { +val resolved = fileIndex.partitionSchema.map { partitionField => + // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred + userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( +partitionField) +} +StructType(resolved) + } + + /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when * initializing this class. All other file based data sources will try to infer the partitioning, @@ -139,12 +157,7 @@ case class DataSource( val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - val resolved = tempFileIndex.partitionSchema.map { partitionField => -// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred -userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( - partitionField) - } - StructType(resolved) + combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex) } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning @@ -336,7 +349,13 @@ case class DataSource( caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) -val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) +val tempFileCatalog = new Metadata
spark git commit: [DSTREAM][DOC] Add documentation for kinesis retry configurations
Repository: spark Updated Branches: refs/heads/branch-2.2 8b0cb3a7b -> 556ad019f [DSTREAM][DOC] Add documentation for kinesis retry configurations ## What changes were proposed in this pull request? The changes were merged as part of - https://github.com/apache/spark/pull/17467. The documentation was missed somewhere in the review iterations. Adding the documentation where it belongs. ## How was this patch tested? Docs. Not tested. cc budde , brkyvz Author: Yash Sharma <ysha...@atlassian.com> Closes #18028 from yssharma/ysharma/kinesis_retry_docs. (cherry picked from commit 92580bd0eae5dbf739573093cca1b12fd0c14049) Signed-off-by: Burak Yavuz <brk...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/556ad019 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/556ad019 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/556ad019 Branch: refs/heads/branch-2.2 Commit: 556ad019fa49deb40ba8da3aa6067484ab3d6331 Parents: 8b0cb3a Author: Yash Sharma <ysha...@atlassian.com> Authored: Thu May 18 11:24:33 2017 -0700 Committer: Burak Yavuz <brk...@gmail.com> Committed: Thu May 18 11:24:44 2017 -0700 -- docs/streaming-kinesis-integration.md | 4 1 file changed, 4 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/556ad019/docs/streaming-kinesis-integration.md -- diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 6be0b54..9709bd3 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -216,3 +216,7 @@ de-aggregate records during consumption. - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`). This is configurable. - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored). - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency. + + Kinesis retry configuration + - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms". + - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for Kinesis fetches. This config can also be used to tackle the Kinesis `ProvisionedThroughputExceededException`'s in scenarios mentioned above. It can be increased to have more number of retries for Kinesis reads. Default is 3. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DSTREAM][DOC] Add documentation for kinesis retry configurations
Repository: spark Updated Branches: refs/heads/master 8fb3d5c6d -> 92580bd0e [DSTREAM][DOC] Add documentation for kinesis retry configurations ## What changes were proposed in this pull request? The changes were merged as part of - https://github.com/apache/spark/pull/17467. The documentation was missed somewhere in the review iterations. Adding the documentation where it belongs. ## How was this patch tested? Docs. Not tested. cc budde , brkyvz Author: Yash Sharma <ysha...@atlassian.com> Closes #18028 from yssharma/ysharma/kinesis_retry_docs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92580bd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92580bd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92580bd0 Branch: refs/heads/master Commit: 92580bd0eae5dbf739573093cca1b12fd0c14049 Parents: 8fb3d5c Author: Yash Sharma <ysha...@atlassian.com> Authored: Thu May 18 11:24:33 2017 -0700 Committer: Burak Yavuz <brk...@gmail.com> Committed: Thu May 18 11:24:33 2017 -0700 -- docs/streaming-kinesis-integration.md | 4 1 file changed, 4 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92580bd0/docs/streaming-kinesis-integration.md -- diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 6be0b54..9709bd3 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -216,3 +216,7 @@ de-aggregate records during consumption. - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`). This is configurable. - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored). - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency. + + Kinesis retry configuration + - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms". + - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for Kinesis fetches. This config can also be used to tackle the Kinesis `ProvisionedThroughputExceededException`'s in scenarios mentioned above. It can be increased to have more number of retries for Kinesis reads. Default is 3. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries
Repository: spark Updated Branches: refs/heads/branch-2.2 75e5ea294 -> 7076ab40f [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries ## What changes were proposed in this pull request? The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES. This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge. Following happens in a typical kinesis recovery : - kinesis throttles large number of requests while recovering - retries in case of throttling are not able to recover due to the small wait period - kinesis throttles per second, the wait period should be configurable for recovery The patch picks the spark kinesis configs from: - spark.streaming.kinesis.retry.wait.time - spark.streaming.kinesis.retry.max.attempts Jira : https://issues.apache.org/jira/browse/SPARK-20140 ## How was this patch tested? Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling. Author: Yash SharmaCloses #17467 from yssharma/ysharma/spark-kinesis-retries. (cherry picked from commit 38f4e8692ce3b6cbcfe0c1aff9b5e662f7a308b7) Signed-off-by: Burak Yavuz Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7076ab40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7076ab40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7076ab40 Branch: refs/heads/branch-2.2 Commit: 7076ab40f86fe606cd9b813dad506e921501383e Parents: 75e5ea2 Author: Yash Sharma Authored: Tue May 16 15:08:05 2017 -0700 Committer: Burak Yavuz Committed: Tue May 16 15:08:46 2017 -0700 -- .../kinesis/KinesisBackedBlockRDD.scala | 33 - .../streaming/kinesis/KinesisInputDStream.scala | 6 +- .../kinesis/KinesisReadConfigurations.scala | 78 .../streaming/kinesis/KinesisStreamSuite.scala | 49 +++- 4 files changed, 143 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7076ab40/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index f31ebf1..88b2942 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal -import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} +import com.amazonaws.auth.AWSCredentials import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ @@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag]( @transient private val _blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient private val isBlockIdValid: Array[Boolean] = Array.empty, -val retryTimeoutMs: Int = 1, val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _, -val kinesisCreds: SparkAWSCredentials = DefaultCredentials +val kinesisCreds: SparkAWSCredentials = DefaultCredentials, +val kinesisReadConfigs: KinesisReadConfigurations = KinesisReadConfigurations() ) extends BlockRDD[T](sc, _blockIds) { require(_blockIds.length == arrayOfseqNumberRanges.length, @@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag]( val credentials = kinesisCreds.provider.getCredentials partition.seqNumberRanges.ranges.iterator.flatMap { range => new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName, - range, retryTimeoutMs).map(messageHandler) + range, kinesisReadConfigs).map(messageHandler) } } if (partition.isBlockIdValid) { @@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator( endpointUrl: String, regionId: String, range: SequenceNumberRange, -retryTimeoutMs: Int) extends NextIterator[Record] with Logging { +kinesisReadConfigs: KinesisReadConfigurations) extends NextIterator[Record] with Logging { private val client = new AmazonKinesisClient(credentials) private val streamName = range.streamName @@ -251,21 +251,19 @@ class
spark git commit: [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries
Repository: spark Updated Branches: refs/heads/master 6f62e9d9b -> 38f4e8692 [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries ## What changes were proposed in this pull request? The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES. This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge. Following happens in a typical kinesis recovery : - kinesis throttles large number of requests while recovering - retries in case of throttling are not able to recover due to the small wait period - kinesis throttles per second, the wait period should be configurable for recovery The patch picks the spark kinesis configs from: - spark.streaming.kinesis.retry.wait.time - spark.streaming.kinesis.retry.max.attempts Jira : https://issues.apache.org/jira/browse/SPARK-20140 ## How was this patch tested? Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling. Author: Yash SharmaCloses #17467 from yssharma/ysharma/spark-kinesis-retries. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38f4e869 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38f4e869 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38f4e869 Branch: refs/heads/master Commit: 38f4e8692ce3b6cbcfe0c1aff9b5e662f7a308b7 Parents: 6f62e9d Author: Yash Sharma Authored: Tue May 16 15:08:05 2017 -0700 Committer: Burak Yavuz Committed: Tue May 16 15:08:05 2017 -0700 -- .../kinesis/KinesisBackedBlockRDD.scala | 33 - .../streaming/kinesis/KinesisInputDStream.scala | 6 +- .../kinesis/KinesisReadConfigurations.scala | 78 .../streaming/kinesis/KinesisStreamSuite.scala | 49 +++- 4 files changed, 143 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38f4e869/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index f31ebf1..88b2942 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal -import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} +import com.amazonaws.auth.AWSCredentials import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ @@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag]( @transient private val _blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient private val isBlockIdValid: Array[Boolean] = Array.empty, -val retryTimeoutMs: Int = 1, val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _, -val kinesisCreds: SparkAWSCredentials = DefaultCredentials +val kinesisCreds: SparkAWSCredentials = DefaultCredentials, +val kinesisReadConfigs: KinesisReadConfigurations = KinesisReadConfigurations() ) extends BlockRDD[T](sc, _blockIds) { require(_blockIds.length == arrayOfseqNumberRanges.length, @@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag]( val credentials = kinesisCreds.provider.getCredentials partition.seqNumberRanges.ranges.iterator.flatMap { range => new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName, - range, retryTimeoutMs).map(messageHandler) + range, kinesisReadConfigs).map(messageHandler) } } if (partition.isBlockIdValid) { @@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator( endpointUrl: String, regionId: String, range: SequenceNumberRange, -retryTimeoutMs: Int) extends NextIterator[Record] with Logging { +kinesisReadConfigs: KinesisReadConfigurations) extends NextIterator[Record] with Logging { private val client = new AmazonKinesisClient(credentials) private val streamName = range.streamName @@ -251,21 +251,19 @@ class KinesisSequenceRangeIterator( /** Helper method to retry Kinesis API request with exponential backoff and timeouts */
spark git commit: [SPARK-20441][SPARK-20432][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation
Repository: spark Updated Branches: refs/heads/branch-2.2 b5947f5c3 -> b1a732fea [SPARK-20441][SPARK-20432][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation ## What changes were proposed in this pull request? Within the same streaming query, when one `StreamingRelation` is referred multiple times – e.g. `df.union(df)` – we should transform it only to one `StreamingExecutionRelation`, instead of two or more different `StreamingExecutionRelation`s (each of which would have a separate set of source, source logs, ...). ## How was this patch tested? Added two test cases, each of which would fail without this patch. Author: Liwei LinCloses #17735 from lw-lin/SPARK-20441. (cherry picked from commit 27f543b15f2f493f6f8373e46b4c9564b0a1bf81) Signed-off-by: Burak Yavuz Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1a732fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1a732fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1a732fe Branch: refs/heads/branch-2.2 Commit: b1a732fead32a37afcb7cf7a35facc49a449b8e2 Parents: b5947f5 Author: Liwei Lin Authored: Wed May 3 08:55:02 2017 -0700 Committer: Burak Yavuz Committed: Wed May 3 08:55:17 2017 -0700 -- .../execution/streaming/StreamExecution.scala | 20 .../spark/sql/streaming/StreamSuite.scala | 48 2 files changed, 60 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1a732fe/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index affc201..b6ddf74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantLock +import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -148,15 +149,18 @@ class StreamExecution( "logicalPlan must be initialized in StreamExecutionThread " + s"but the current thread was ${Thread.currentThread}") var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() val _logicalPlan = analyzedPlan.transform { - case StreamingRelation(dataSource, _, output) => -// Materialize source to avoid creating it in every batch -val metadataPath = s"$checkpointRoot/sources/$nextSourceId" -val source = dataSource.createSource(metadataPath) -nextSourceId += 1 -// We still need to use the previous `output` instead of `source.schema` as attributes in -// "df.logicalPlan" has already used attributes of the previous `output`. -StreamingExecutionRelation(source, output) + case streamingRelation@StreamingRelation(dataSource, _, output) => +toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$checkpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output) +}) } sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } uniqueSources = sources.distinct http://git-wip-us.apache.org/repos/asf/spark/blob/b1a732fe/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 01ea62a..1fc0629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -71,6 +71,27 @@ class StreamSuite extends StreamTest { CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4,
spark git commit: [SPARK-20441][SPARK-20432][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation
Repository: spark Updated Branches: refs/heads/master 7f96f2d7f -> 27f543b15 [SPARK-20441][SPARK-20432][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation ## What changes were proposed in this pull request? Within the same streaming query, when one `StreamingRelation` is referred multiple times – e.g. `df.union(df)` – we should transform it only to one `StreamingExecutionRelation`, instead of two or more different `StreamingExecutionRelation`s (each of which would have a separate set of source, source logs, ...). ## How was this patch tested? Added two test cases, each of which would fail without this patch. Author: Liwei LinCloses #17735 from lw-lin/SPARK-20441. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27f543b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27f543b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27f543b1 Branch: refs/heads/master Commit: 27f543b15f2f493f6f8373e46b4c9564b0a1bf81 Parents: 7f96f2d Author: Liwei Lin Authored: Wed May 3 08:55:02 2017 -0700 Committer: Burak Yavuz Committed: Wed May 3 08:55:02 2017 -0700 -- .../execution/streaming/StreamExecution.scala | 20 .../spark/sql/streaming/StreamSuite.scala | 48 2 files changed, 60 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27f543b1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index affc201..b6ddf74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantLock +import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -148,15 +149,18 @@ class StreamExecution( "logicalPlan must be initialized in StreamExecutionThread " + s"but the current thread was ${Thread.currentThread}") var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() val _logicalPlan = analyzedPlan.transform { - case StreamingRelation(dataSource, _, output) => -// Materialize source to avoid creating it in every batch -val metadataPath = s"$checkpointRoot/sources/$nextSourceId" -val source = dataSource.createSource(metadataPath) -nextSourceId += 1 -// We still need to use the previous `output` instead of `source.schema` as attributes in -// "df.logicalPlan" has already used attributes of the previous `output`. -StreamingExecutionRelation(source, output) + case streamingRelation@StreamingRelation(dataSource, _, output) => +toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$checkpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output) +}) } sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } uniqueSources = sources.distinct http://git-wip-us.apache.org/repos/asf/spark/blob/27f543b1/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 01ea62a..1fc0629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -71,6 +71,27 @@ class StreamSuite extends StreamTest { CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four"))) } + test("SPARK-20432: union one stream with itself") { +val df =
spark git commit: [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans
Repository: spark Updated Branches: refs/heads/branch-2.2 ea5b11446 -> ec712d751 [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans ## What changes were proposed in this pull request? We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka. ## How was this patch tested? New unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bill ChambersCloses #17804 from anabranch/SPARK-20496-2. (cherry picked from commit 733b81b835f952ab96723c749461d6afc0c71974) Signed-off-by: Burak Yavuz Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec712d75 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec712d75 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec712d75 Branch: refs/heads/branch-2.2 Commit: ec712d7510579dc4c8da859c86b5236d3ee767be Parents: ea5b114 Author: Bill Chambers Authored: Fri Apr 28 10:18:31 2017 -0700 Committer: Burak Yavuz Committed: Fri Apr 28 10:18:50 2017 -0700 -- .../org/apache/spark/sql/kafka010/KafkaWriter.scala | 4 ++-- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 16 2 files changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ec712d75/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index a637d52..61936e3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { -val schema = queryExecution.logical.output +val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic == None) { throw new AnalysisException(s"topic option required when no " + @@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { -val schema = queryExecution.logical.output +val schema = queryExecution.analyzed.output validateQuery(queryExecution, kafkaParameters, topic) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { queryExecution.toRdd.foreachPartition { iter => http://git-wip-us.apache.org/repos/asf/spark/blob/ec712d75/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 4bd052d..2ab336c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} @@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { s"save mode overwrite not allowed for kafka")) } + test("SPARK-20496: batch - enforce analyzed plans") { +val inputEvents = + spark.range(1, 1000) +.select(to_json(struct("*")) as 'value) + +val topic = newTopic() +testUtils.createTopic(topic) +// used to throw UnresolvedException +inputEvents.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", topic) + .save() + } + test("streaming - write to kafka with topic field") { val input = MemoryStream[String] val topic = newTopic() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional
spark git commit: [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans
Repository: spark Updated Branches: refs/heads/branch-2.1 6696ad0e8 -> 5131b0a96 [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans ## What changes were proposed in this pull request? We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka. ## How was this patch tested? New unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bill ChambersCloses #17804 from anabranch/SPARK-20496-2. (cherry picked from commit 733b81b835f952ab96723c749461d6afc0c71974) Signed-off-by: Burak Yavuz Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5131b0a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5131b0a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5131b0a9 Branch: refs/heads/branch-2.1 Commit: 5131b0a963699b6910949ae2361fa740dafdb678 Parents: 6696ad0 Author: Bill Chambers Authored: Fri Apr 28 10:18:31 2017 -0700 Committer: Burak Yavuz Committed: Fri Apr 28 10:19:14 2017 -0700 -- .../org/apache/spark/sql/kafka010/KafkaWriter.scala | 4 ++-- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 16 2 files changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5131b0a9/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index a637d52..61936e3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { -val schema = queryExecution.logical.output +val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic == None) { throw new AnalysisException(s"topic option required when no " + @@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { -val schema = queryExecution.logical.output +val schema = queryExecution.analyzed.output validateQuery(queryExecution, kafkaParameters, topic) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { queryExecution.toRdd.foreachPartition { iter => http://git-wip-us.apache.org/repos/asf/spark/blob/5131b0a9/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 4905356..1e7f4f2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} @@ -107,6 +108,21 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { s"save mode overwrite not allowed for kafka")) } + test("SPARK-20496: batch - enforce analyzed plans") { +val inputEvents = + spark.range(1, 1000) +.select(to_json(struct("*")) as 'value) + +val topic = newTopic() +testUtils.createTopic(topic) +// used to throw UnresolvedException +inputEvents.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", topic) + .save() + } + test("streaming - write to kafka with topic field") { val input = MemoryStream[String] val topic = newTopic() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional
spark git commit: [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans
Repository: spark Updated Branches: refs/heads/master 8c911adac -> 733b81b83 [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans ## What changes were proposed in this pull request? We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka. ## How was this patch tested? New unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bill ChambersCloses #17804 from anabranch/SPARK-20496-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/733b81b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/733b81b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/733b81b8 Branch: refs/heads/master Commit: 733b81b835f952ab96723c749461d6afc0c71974 Parents: 8c911ad Author: Bill Chambers Authored: Fri Apr 28 10:18:31 2017 -0700 Committer: Burak Yavuz Committed: Fri Apr 28 10:18:31 2017 -0700 -- .../org/apache/spark/sql/kafka010/KafkaWriter.scala | 4 ++-- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 16 2 files changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/733b81b8/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index a637d52..61936e3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { -val schema = queryExecution.logical.output +val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic == None) { throw new AnalysisException(s"topic option required when no " + @@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { -val schema = queryExecution.logical.output +val schema = queryExecution.analyzed.output validateQuery(queryExecution, kafkaParameters, topic) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { queryExecution.toRdd.foreachPartition { iter => http://git-wip-us.apache.org/repos/asf/spark/blob/733b81b8/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 4bd052d..2ab336c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} @@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { s"save mode overwrite not allowed for kafka")) } + test("SPARK-20496: batch - enforce analyzed plans") { +val inputEvents = + spark.range(1, 1000) +.select(to_json(struct("*")) as 'value) + +val topic = newTopic() +testUtils.createTopic(topic) +// used to throw UnresolvedException +inputEvents.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", topic) + .save() + } + test("streaming - write to kafka with topic field") { val input = MemoryStream[String] val topic = newTopic() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19911][STREAMING] Add builder interface for Kinesis DStreams
Repository: spark Updated Branches: refs/heads/master 9299d071f -> 707e50183 [SPARK-19911][STREAMING] Add builder interface for Kinesis DStreams ## What changes were proposed in this pull request? - Add new KinesisDStream.scala containing KinesisDStream.Builder class - Add KinesisDStreamBuilderSuite test suite - Make KinesisInputDStream ctor args package private for testing - Add JavaKinesisDStreamBuilderSuite test suite - Add args to KinesisInputDStream and KinesisReceiver for optional service-specific auth (Kinesis, DynamoDB and CloudWatch) ## How was this patch tested? Added ```KinesisDStreamBuilderSuite``` to verify builder class works as expected Author: Adam BuddeCloses #17250 from budde/KinesisStreamBuilder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/707e5018 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/707e5018 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/707e5018 Branch: refs/heads/master Commit: 707e501832fa7adde0a884c528a7352983d83520 Parents: 9299d07 Author: Adam Budde Authored: Fri Mar 24 12:40:29 2017 -0700 Committer: Burak Yavuz Committed: Fri Mar 24 12:40:29 2017 -0700 -- .../kinesis/KinesisBackedBlockRDD.scala | 6 +- .../streaming/kinesis/KinesisInputDStream.scala | 259 ++- .../streaming/kinesis/KinesisReceiver.scala | 20 +- .../spark/streaming/kinesis/KinesisUtils.scala | 43 +-- .../SerializableCredentialsProvider.scala | 85 -- .../streaming/kinesis/SparkAWSCredentials.scala | 182 + .../JavaKinesisInputDStreamBuilderSuite.java| 63 + .../KinesisInputDStreamBuilderSuite.scala | 115 .../kinesis/KinesisReceiverSuite.scala | 23 -- .../streaming/kinesis/KinesisStreamSuite.scala | 2 +- .../SparkAWSCredentialsBuilderSuite.scala | 100 +++ 11 files changed, 749 insertions(+), 149 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/707e5018/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 0f1790b..f31ebf1 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -82,8 +82,8 @@ class KinesisBackedBlockRDD[T: ClassTag]( @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient private val isBlockIdValid: Array[Boolean] = Array.empty, val retryTimeoutMs: Int = 1, -val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _, -val kinesisCredsProvider: SerializableCredentialsProvider = DefaultCredentialsProvider +val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _, +val kinesisCreds: SparkAWSCredentials = DefaultCredentials ) extends BlockRDD[T](sc, _blockIds) { require(_blockIds.length == arrayOfseqNumberRanges.length, @@ -109,7 +109,7 @@ class KinesisBackedBlockRDD[T: ClassTag]( } def getBlockFromKinesis(): Iterator[T] = { - val credentials = kinesisCredsProvider.provider.getCredentials + val credentials = kinesisCreds.provider.getCredentials partition.seqNumberRanges.ranges.iterator.flatMap { range => new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName, range, retryTimeoutMs).map(messageHandler) http://git-wip-us.apache.org/repos/asf/spark/blob/707e5018/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index fbc6b99..8970ad2 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -22,24 +22,28 @@ import scala.reflect.ClassTag import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.Record +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, StorageLevel} import
spark git commit: Fix compilation of the Scala 2.10 master branch
Repository: spark Updated Branches: refs/heads/master c79118070 -> 93581fbc1 Fix compilation of the Scala 2.10 master branch ## What changes were proposed in this pull request? Fixes break caused by: https://github.com/apache/spark/commit/746a558de2136f91f8fe77c6e51256017aa50913 ## How was this patch tested? Compiled with `build/sbt -Dscala2.10 sql/compile` locally Author: Burak Yavuz <brk...@gmail.com> Closes #17403 from brkyvz/onceTrigger2.10. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93581fbc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93581fbc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93581fbc Branch: refs/heads/master Commit: 93581fbc18c01595918c565f6737aaa666116114 Parents: c791180 Author: Burak Yavuz <brk...@gmail.com> Authored: Thu Mar 23 17:57:31 2017 -0700 Committer: Burak Yavuz <brk...@gmail.com> Committed: Thu Mar 23 17:57:31 2017 -0700 -- .../spark/sql/streaming/ProcessingTime.scala| 20 ++-- .../org/apache/spark/sql/streaming/Trigger.java | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93581fbc/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index bdad8e4..9ba1fc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -51,7 +51,7 @@ import org.apache.spark.unsafe.types.CalendarInterval */ @Experimental @InterfaceStability.Evolving -@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") +@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0") case class ProcessingTime(intervalMs: Long) extends Trigger { require(intervalMs >= 0, "the interval of trigger should not be negative") } @@ -64,7 +64,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { */ @Experimental @InterfaceStability.Evolving -@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") +@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0") object ProcessingTime { /** @@ -76,9 +76,9 @@ object ProcessingTime { * }}} * * @since 2.0.0 - * @deprecated use Trigger.ProcessingTimeTrigger(interval) + * @deprecated use Trigger.ProcessingTime(interval) */ - @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") + @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") def apply(interval: String): ProcessingTime = { if (StringUtils.isBlank(interval)) { throw new IllegalArgumentException( @@ -108,9 +108,9 @@ object ProcessingTime { * }}} * * @since 2.0.0 - * @deprecated use Trigger.ProcessingTimeTrigger(interval) + * @deprecated use Trigger.ProcessingTime(interval) */ - @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") + @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") def apply(interval: Duration): ProcessingTime = { new ProcessingTime(interval.toMillis) } @@ -124,9 +124,9 @@ object ProcessingTime { * }}} * * @since 2.0.0 - * @deprecated use Trigger.ProcessingTimeTrigger(interval) + * @deprecated use Trigger.ProcessingTime(interval) */ - @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") + @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") def create(interval: String): ProcessingTime = { apply(interval) } @@ -141,9 +141,9 @@ object ProcessingTime { * }}} * * @since 2.0.0 - * @deprecated use Trigger.ProcessingTimeTrigger(interval) + * @deprecated use Trigger.ProcessingTime(interval, unit) */ - @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0") + @deprecated("use Trigger.ProcessingTime(interval, unit)", "2.2.0") def create(interval: Long, unit: TimeUnit): ProcessingTime = { new ProcessingTime(unit.toMillis(interval)) } http://git-wip-us.apache.org/repos/asf/spark/blob/93581fbc/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/scala/org/apache/spark/sql/streaming/
spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
Repository: spark Updated Branches: refs/heads/master 455129020 -> a3648b5d4 [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak Yavuz <brk...@gmail.com> Closes #17153 from brkyvz/maxFileAge. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3648b5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3648b5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3648b5d Branch: refs/heads/master Commit: a3648b5d4f99ff9461d02f53e9ec71787a3abf51 Parents: 4551290 Author: Burak Yavuz <brk...@gmail.com> Authored: Wed Mar 8 14:35:07 2017 -0800 Committer: Burak Yavuz <brk...@gmail.com> Committed: Wed Mar 8 14:35:07 2017 -0800 -- .../execution/streaming/FileStreamOptions.scala | 5 +- .../execution/streaming/FileStreamSource.scala | 14 +++- .../sql/streaming/FileStreamSourceSuite.scala | 82 3 files changed, 63 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 2f802d7..e7ba901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging } /** - * Maximum age of a file that can be found in this directory, before it is deleted. + * Maximum age of a file that can be found in this directory, before it is ignored. For the + * first batch all files will be considered valid. If `latestFirst` is set to `true` and + * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are + * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 6a7263c..0f09b0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -66,23 +66,29 @@ class FileStreamSource( private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( -"""'latestFirst' is true. New files will be processed first. - |It may affect the watermark value""".stripMargin) +"""'latestFirst' is true. New files will be
spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
Repository: spark Updated Branches: refs/heads/branch-2.1 320eff14b -> f6c1ad2eb [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak Yavuz <brk...@gmail.com> Closes #17153 from brkyvz/maxFileAge. (cherry picked from commit a3648b5d4f99ff9461d02f53e9ec71787a3abf51) Signed-off-by: Burak Yavuz <brk...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6c1ad2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6c1ad2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6c1ad2e Branch: refs/heads/branch-2.1 Commit: f6c1ad2eb6d0706899aabbdd39e558b3488e2ef3 Parents: 320eff1 Author: Burak Yavuz <brk...@gmail.com> Authored: Wed Mar 8 14:35:07 2017 -0800 Committer: Burak Yavuz <brk...@gmail.com> Committed: Wed Mar 8 14:35:22 2017 -0800 -- .../execution/streaming/FileStreamOptions.scala | 5 +- .../execution/streaming/FileStreamSource.scala | 14 +++- .../sql/streaming/FileStreamSourceSuite.scala | 82 3 files changed, 63 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 25ebe17..fe64838 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging { } /** - * Maximum age of a file that can be found in this directory, before it is deleted. + * Maximum age of a file that can be found in this directory, before it is ignored. For the + * first batch all files will be considered valid. If `latestFirst` is set to `true` and + * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are + * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 39c0b49..0f0b6f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -64,23 +64,29 @@ class FileStreamSource( private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( -"""'latestFirst' is true. New files will be processed first. -
spark git commit: [SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery
Repository: spark Updated Branches: refs/heads/master 339b53a13 -> 46a64d1e0 [SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery ## What changes were proposed in this pull request? added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps reduce the amount of data returned by kinesis api call making the recovery considerably faster As we are storing the `fromSeqNum` & `toSeqNum` in checkpoint metadata, we can also store the number of records. Which can later be used for api call. ## How was this patch tested? The patch was manually tested Apologies for any silly mistakes, opening first pull request Author: GauravCloses #16842 from Gauravshah/kinesis_checkpoint_recovery_fix_2_1_0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46a64d1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46a64d1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46a64d1e Branch: refs/heads/master Commit: 46a64d1e0ae12c31e848f377a84fb28e3efb3699 Parents: 339b53a Author: Gaurav Authored: Mon Mar 6 10:41:49 2017 -0800 Committer: Burak Yavuz Committed: Mon Mar 6 10:41:49 2017 -0800 -- .../kinesis/KinesisBackedBlockRDD.scala | 25 +++- .../streaming/kinesis/KinesisReceiver.scala | 3 ++- .../kinesis/KinesisBackedBlockRDDSuite.scala| 4 ++-- .../streaming/kinesis/KinesisStreamSuite.scala | 4 ++-- 4 files changed, 25 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 23c4d99..0f1790b 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator /** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */ private[kinesis] case class SequenceNumberRange( -streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) +streamName: String, +shardId: String, +fromSeqNumber: String, +toSeqNumber: String, +recordCount: Int) /** Class representing an array of Kinesis sequence number ranges */ private[kinesis] @@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator( private val client = new AmazonKinesisClient(credentials) private val streamName = range.streamName private val shardId = range.shardId + // AWS limits to maximum of 10k records per get call + private val maxGetRecordsLimit = 1 private var toSeqNumberReceived = false private var lastSeqNumber: String = null @@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator( // If the internal iterator has not been initialized, // then fetch records from starting sequence number -internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber) +internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber, + range.recordCount) } else if (!internalIterator.hasNext) { // If the internal iterator does not have any more records, // then fetch more records after the last consumed sequence number -internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) +internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber, + range.recordCount) } if (!internalIterator.hasNext) { @@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator( /** * Get records starting from or after the given sequence number. */ - private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = { + private def getRecords( + iteratorType: ShardIteratorType, + seqNum: String, + recordCount: Int): Iterator[Record] = { val shardIterator = getKinesisIterator(iteratorType, seqNum) -val result = getRecordsAndNextKinesisIterator(shardIterator) +val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount) result._1 } @@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator( * to get records from Kinesis), and get the next shard iterator for next consumption. */ private def
spark git commit: [SPARK-19595][SQL] Support json array in from_json
Repository: spark Updated Branches: refs/heads/master 80d5338b3 -> 369a148e5 [SPARK-19595][SQL] Support json array in from_json ## What changes were proposed in this pull request? This PR proposes to both, **Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.** Currently, it only reads the single row when the input is a json array. So, the codes below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = StructType(StructField("a", IntegerType) :: Nil) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show() ``` prints ``` ++ |jsontostruct(struct)| ++ | [1]| ++ ``` This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements ``` ++ |jsontostruct(struct)| ++ |null| ++ ``` **Support json arrays in `from_json` with `ArrayType` as the schema.** ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show() ``` prints ``` +---+ |jsontostruct(array)| +---+ | [[1], [2]]| +---+ ``` ## How was this patch tested? Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test. Author: hyukjinkwonCloses #16929 from HyukjinKwon/disallow-array. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/369a148e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/369a148e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/369a148e Branch: refs/heads/master Commit: 369a148e591bb16ec7da54867610b207602cd698 Parents: 80d5338 Author: hyukjinkwon Authored: Sun Mar 5 14:35:06 2017 -0800 Committer: Burak Yavuz Committed: Sun Mar 5 14:35:06 2017 -0800 -- python/pyspark/sql/functions.py | 11 +++- .../catalyst/expressions/jsonExpressions.scala | 57 --- .../expressions/JsonExpressionsSuite.scala | 58 +++- .../scala/org/apache/spark/sql/functions.scala | 52 -- .../apache/spark/sql/JsonFunctionsSuite.scala | 25 - 5 files changed, 186 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 426a4a8..376b86e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1773,11 +1773,11 @@ def json_tuple(col, *fields): @since(2.1) def from_json(col, schema, options={}): """ -Parses a column containing a JSON string into a [[StructType]] with the -specified schema. Returns `null`, in the case of an unparseable string. +Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]] +with the specified schema. Returns `null`, in the case of an unparseable string. :param col: string column in json format -:param schema: a StructType to use when parsing the json column +:param schema: a StructType or ArrayType to use when parsing the json column :param options: options to control parsing. accepts the same options as the json datasource >>> from pyspark.sql.types import * @@ -1786,6 +1786,11 @@ def from_json(col, schema, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(from_json(df.value, schema).alias("json")).collect() [Row(json=Row(a=1))] +>>> data = [(1, '''[{"a": 1}]''')] +>>> schema = ArrayType(StructType([StructField("a", IntegerType())])) +>>> df = spark.createDataFrame(data, ("key", "value")) +>>> df.select(from_json(df.value, schema).alias("json")).collect() +[Row(json=[Row(a=1)])] """ sc = SparkContext._active_spark_context http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 1e690a4..dbff62e 100644 ---
spark git commit: [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors
Repository: spark Updated Branches: refs/heads/branch-2.1 ef4fb7ebc -> c5a7cb022 [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors ## What changes were proposed in this pull request? When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors. ## How was this patch tested? New unit tests. Author: Shixiong ZhuCloses #16880 from zsxwing/delete-temp-checkpoint. (cherry picked from commit 3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529) Signed-off-by: Burak Yavuz Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5a7cb02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5a7cb02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5a7cb02 Branch: refs/heads/branch-2.1 Commit: c5a7cb0225ed4ed0d1ede5da0593b258c5dfd79f Parents: ef4fb7e Author: Shixiong Zhu Authored: Mon Feb 13 11:54:54 2017 -0800 Committer: Burak Yavuz Committed: Mon Feb 13 11:55:11 2017 -0800 -- .../execution/streaming/StreamExecution.scala | 24 -- .../sql/streaming/StreamingQueryManager.scala | 6 - .../test/DataStreamReaderWriterSuite.scala | 26 3 files changed, 53 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5a7cb02/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a35950e..a8ec73e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.IOException import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.locks.ReentrantLock @@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created * and the results are committed transactionally to the given [[Sink]]. + * + * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without + * errors */ class StreamExecution( override val sparkSession: SparkSession, override val name: String, -checkpointRoot: String, +val checkpointRoot: String, analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, -val outputMode: OutputMode) +val outputMode: OutputMode, +deleteCheckpointOnStop: Boolean) extends StreamingQuery with ProgressReporter with Logging { import org.apache.spark.sql.streaming.StreamingQueryListener._ @@ -213,6 +218,7 @@ class StreamExecution( * has been posted to all the listeners. */ def start(): Unit = { +logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.") microBatchThread.setDaemon(true) microBatchThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted @@ -323,6 +329,20 @@ class StreamExecution( sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) + +// Delete the temp checkpoint only when the query didn't fail +if (deleteCheckpointOnStop && exception.isEmpty) { + val checkpointPath = new Path(checkpointRoot) + try { +val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +fs.delete(checkpointPath, true) + } catch { +case NonFatal(e) => + // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions + // when we cannot delete them. + logWarning(s"Cannot delete $checkpointPath", e) + } +} } finally { terminationLatch.countDown() } http://git-wip-us.apache.org/repos/asf/spark/blob/c5a7cb02/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala -- diff --git
spark git commit: [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors
Repository: spark Updated Branches: refs/heads/master 0417ce878 -> 3dbff9be0 [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors ## What changes were proposed in this pull request? When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors. ## How was this patch tested? New unit tests. Author: Shixiong ZhuCloses #16880 from zsxwing/delete-temp-checkpoint. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dbff9be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dbff9be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dbff9be Branch: refs/heads/master Commit: 3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529 Parents: 0417ce8 Author: Shixiong Zhu Authored: Mon Feb 13 11:54:54 2017 -0800 Committer: Burak Yavuz Committed: Mon Feb 13 11:54:54 2017 -0800 -- .../execution/streaming/StreamExecution.scala | 24 -- .../sql/streaming/StreamingQueryManager.scala | 6 - .../test/DataStreamReaderWriterSuite.scala | 26 3 files changed, 53 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3dbff9be/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ea37194..3149ef0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.IOException import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.locks.ReentrantLock @@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created * and the results are committed transactionally to the given [[Sink]]. + * + * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without + * errors */ class StreamExecution( override val sparkSession: SparkSession, override val name: String, -checkpointRoot: String, +val checkpointRoot: String, analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, -val outputMode: OutputMode) +val outputMode: OutputMode, +deleteCheckpointOnStop: Boolean) extends StreamingQuery with ProgressReporter with Logging { import org.apache.spark.sql.streaming.StreamingQueryListener._ @@ -213,6 +218,7 @@ class StreamExecution( * has been posted to all the listeners. */ def start(): Unit = { +logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.") microBatchThread.setDaemon(true) microBatchThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted @@ -323,6 +329,20 @@ class StreamExecution( sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) + +// Delete the temp checkpoint only when the query didn't fail +if (deleteCheckpointOnStop && exception.isEmpty) { + val checkpointPath = new Path(checkpointRoot) + try { +val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +fs.delete(checkpointPath, true) + } catch { +case NonFatal(e) => + // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions + // when we cannot delete them. + logWarning(s"Cannot delete $checkpointPath", e) + } +} } finally { terminationLatch.countDown() } http://git-wip-us.apache.org/repos/asf/spark/blob/3dbff9be/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index
spark git commit: [SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multipl
Repository: spark Updated Branches: refs/heads/master 9f523d319 -> 1191fe267 [SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication ## What changes were proposed in this pull request? ### The problem in current block matrix mulitiplication As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur: - when the middle dimension N is too large, it will cause reducer OOM. - even if OOM do not occur, it will still cause parallism too low. - when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.) ### Key point of my improvement In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem. In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above: suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols: ``` A00 A01 A02 ... A0,99 A10 A11 A12 ... A1,99 ``` and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols: ``` B00B01 B10B11 B20B21 ... B99,0 B99,1 ``` Suppose all blocks in the two matrices are dense for now. Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps: **step-1** Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following: ``` A00 A01 A02 ... A0,99 B00 B10 B20 ... B99,0shuffled into reducer-00 A00 A01 A02 ... A0,99 B01 B11 B21 ... B99,1shuffled into reducer-01 A10 A11 A12 ... A1,99 B00 B10 B20 ... B99,0shuffled into reducer-10 A10 A11 A12 ... A1,99 B01 B11 B21 ... B99,1shuffled into reducer-11 ``` and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**. **step-2** Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block. The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small. and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 1 cols blocks or more instead of 100? Than each reducer will load 2 blocks into memory. It will easily cause reducer OOM. This PR try to resolve the problem described above. When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly. In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N. Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**: the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks. now the shuffle works as following: **reducer-000 to reducer-009** ``` A0,0 A0,10 A0,20 ... A0,90 B0,0 B10,0 B20,0 ... B90,0shuffled into reducer-000 A0,1 A0,11 A0,21 ... A0,91 B1,0 B11,0 B21,0 ... B91,0shuffled into reducer-001 A0,2 A0,12 A0,22 ... A0,92 B2,0 B12,0 B22,0 ... B92,0shuffled into reducer-002 ... A0,9 A0,19 A0,29 ... A0,99 B9,0 B19,0 B29,0 ... B99,0shuffled into reducer-009 ``` **reducer-010 to reducer-019** ``` A0,0 A0,10 A0,20 ... A0,90 B0,1 B10,1 B20,1 ... B90,1shuffled into reducer-010 A0,1 A0,11 A0,21 ... A0,91 B1,1 B11,1 B21,1 ... B91,1shuffled into reducer-011 A0,2 A0,12 A0,22 ... A0,92 B2,1 B12,1 B22,1 ... B92,1shuffled into reducer-012 ... A0,9 A0,19 A0,29 ... A0,99 B9,1 B19,1 B29,1 ... B99,1shuffled into reducer-019 ``` **reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out. ### API for this optimized algorithm I add a new API as following: ``` def
spark git commit: [SPARK-18020][STREAMING][KINESIS] Checkpoint SHARD_END to finish reading closed shards
Repository: spark Updated Branches: refs/heads/master 233845126 -> 256a3a801 [SPARK-18020][STREAMING][KINESIS] Checkpoint SHARD_END to finish reading closed shards ## What changes were proposed in this pull request? This pr is to fix an issue occurred when resharding Kinesis streams; the resharding makes the KCL throw an exception because Spark does not checkpoint `SHARD_END` when finishing reading closed shards in `KinesisRecordProcessor#shutdown`. This bug finally leads to stopping subscribing new split (or merged) shards. ## How was this patch tested? Added a test in `KinesisStreamSuite` to check if it works well when splitting/merging shards. Author: Takeshi YAMAMUROCloses #16213 from maropu/SPARK-18020. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/256a3a80 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/256a3a80 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/256a3a80 Branch: refs/heads/master Commit: 256a3a801366ab9f705e50690114e49fdb49b38e Parents: 2338451 Author: Takeshi YAMAMURO Authored: Wed Jan 25 17:38:48 2017 -0800 Committer: Burak Yavuz Committed: Wed Jan 25 17:38:48 2017 -0800 -- .../streaming/kinesis/KinesisCheckpointer.scala | 15 - .../streaming/kinesis/KinesisTestUtils.scala| 30 - .../kinesis/KPLBasedKinesisTestUtils.scala | 3 +- .../kinesis/KinesisCheckpointerSuite.scala | 5 +- .../streaming/kinesis/KinesisStreamSuite.scala | 70 python/pyspark/streaming/tests.py | 2 +- 6 files changed, 117 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/256a3a80/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala index 3e697f3..c445c15 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala @@ -64,7 +64,20 @@ private[kinesis] class KinesisCheckpointer( def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { synchronized { checkpointers.remove(shardId) - checkpoint(shardId, checkpointer) +} +if (checkpointer != null) { + try { +// We must call `checkpoint()` with no parameter to finish reading shards. +// See an URL below for details: +// https://forums.aws.amazon.com/thread.jspa?threadID=244218 +KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100) + } catch { +case NonFatal(e) => + logError(s"Exception: WorkerId $workerId encountered an exception while checkpointing" + +s"to finish reading a shard of $shardId.", e) + // Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor + throw e + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/256a3a80/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 0fe6625..f183ef0 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -40,11 +40,10 @@ import org.apache.spark.internal.Logging * * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE! */ -private[kinesis] class KinesisTestUtils extends Logging { +private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Logging { val endpointUrl = KinesisTestUtils.endpointUrl val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() - val streamShardCount = 2 private val createStreamTimeoutSeconds = 300 private val describeStreamPollTimeSeconds = 1 @@ -88,7 +87,7 @@ private[kinesis] class KinesisTestUtils extends Logging { logInfo(s"Creating stream ${_streamName}") val createStreamRequest = new CreateStreamRequest() createStreamRequest.setStreamName(_streamName) -createStreamRequest.setShardCount(2) +