[spark] branch master updated: [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2

2020-11-18 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1df69f7  [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2
1df69f7 is described below

commit 1df69f7e324aa799c05f6158e433371c5eeed8ce
Author: Ryan Blue 
AuthorDate: Wed Nov 18 14:07:51 2020 -0800

[SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2

### What changes were proposed in this pull request?

This adds support for metadata columns to DataSourceV2. If a source 
implements `SupportsMetadataColumns` it must also implement 
`SupportsPushDownRequiredColumns` to support projecting those columns.

The analyzer is updated to resolve metadata columns from 
`LogicalPlan.metadataOutput`, and this adds a rule that will add metadata 
columns to the output of `DataSourceV2Relation` if one is used.

### Why are the changes needed?

This is the solution discussed for exposing additional data in the Kafka 
source. It is also needed for a generic `MERGE INTO` plan.

### Does this PR introduce any user-facing change?

Yes. Users can project additional columns from sources that implement the 
new API. This also updates `DescribeTableExec` to show metadata columns.

### How was this patch tested?

Will include new unit tests.

Closes #28027 from rdblue/add-dsv2-metadata-columns.

Authored-by: Ryan Blue 
Signed-off-by: Burak Yavuz 
---
 .../sql/connector/catalog/MetadataColumn.java  | 58 +
 .../connector/catalog/SupportsMetadataColumns.java | 37 +++
 .../spark/sql/catalyst/analysis/Analyzer.scala | 24 +++
 .../sql/catalyst/plans/logical/LogicalPlan.scala   |  6 +-
 .../plans/logical/basicLogicalOperators.scala  |  6 ++
 .../datasources/v2/DataSourceV2Implicits.scala | 16 -
 .../datasources/v2/DataSourceV2Relation.scala  | 26 +++-
 .../apache/spark/sql/connector/InMemoryTable.scala | 74 ++
 .../datasources/v2/DescribeTableExec.scala | 16 -
 .../execution/datasources/v2/PushDownUtils.scala   | 12 ++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 43 +
 11 files changed, 296 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java
new file mode 100644
index 000..8aefa28
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java
@@ -0,0 +1,58 @@
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Interface for a metadata column.
+ * 
+ * A metadata column can expose additional metadata about a row. For example, 
rows from Kafka can
+ * use metadata columns to expose a message's topic, partition number, and 
offset.
+ * 
+ * A metadata column could also be the result of a transform applied to a 
value in the row. For
+ * example, a partition value produced by bucket(id, 16) could be exposed by a 
metadata column. In
+ * this case, {@link #transform()} should return a non-null {@link Transform} 
that produced the
+ * metadata column's values.
+ */
+@Evolving
+public interface MetadataColumn {
+  /**
+   * The name of this metadata column.
+   *
+   * @return a String name
+   */
+  String name();
+
+  /**
+   * The data type of values in this metadata column.
+   *
+   * @return a {@link DataType}
+   */
+  DataType dataType();
+
+  /**
+   * @return whether values produced by this metadata column may be null
+   */
+  default boolean isNullable() {
+return true;
+  }
+
+  /**
+   * Documentation for this metadata column, or null.
+   *
+   * @return a documentation String
+   */
+  default String comment() {
+return null;
+  }
+
+  /**
+   * The {@link Transform} used to produce this metadata column from data 
rows, or null.
+   *
+   * @return a {@link Transform} used to produce the column's values, or null 
if there isn't one
+   */
+  default Transform transform() {
+return null;
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java
new file mode 100644
index 000..fc31349
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java
@@ -0,0 +1,37 @@
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import

[spark] branch branch-3.0 updated: [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run

2020-04-08 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2221d3e  [SPARK-29314][SS] Don't overwrite the metric "updated" of 
state operator to 0 if empty batch is run
2221d3e is described below

commit 2221d3e0183140a0a98f6de92f84d2d924aab703
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Wed Apr 8 16:59:39 2020 -0700

[SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 
0 if empty batch is run

### What changes were proposed in this pull request?

This patch fixes the behavior of ProgressReporter which always overwrite 
the value of "updated" of state operator to 0 if there's no new data. The 
behavior is correct only when we copy the state progress from "previous" 
executed plan, meaning no batch has been run. (Nonzero value of "updated" would 
be odd if batch didn't run, so it was correct.)

It was safe to assume no data is no batch, but SPARK-24156 enables empty 
data can run the batch if Spark needs to deal with watermark. After the patch, 
it only overwrites the value if both two conditions are met: 1) no data 2) no 
batch.

### Why are the changes needed?

Currently Spark doesn't reflect correct metrics when empty batch is run and 
this patch fixes it.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Modified UT. Note that FlatMapGroupsWithState increases the value of 
"updated" when state rows are removed.
Also manually tested via below query (not a simple query to test with 
spark-shell, as you'll meet closure issue in spark-shell while playing with 
state func):

> query

```
case class RunningCount(count: Long)

object TestFlatMapGroupsWithState {
  def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession

val ss = SparkSession
  .builder()
  .appName("TestFlatMapGroupsWithState")
  .getOrCreate()

ss.conf.set("spark.sql.shuffle.partitions", "5")

import ss.implicits._

val stateFunc = (key: String, values: Iterator[String], state: 
GroupState[RunningCount]) => {
  if (state.hasTimedOut) {
// End users are not restricted to remove the state here - they can 
update the
// state as well. For example, event time session window would have 
list of
// sessions here and it cannot remove entire state.
state.update(RunningCount(-1))
Iterator((key, "-1"))
  } else {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
state.update(RunningCount(count))
state.setTimeoutDuration("1 seconds")
Iterator((key, count.toString))
  }
}

implicit val sqlContext = ss.sqlContext
val inputData = MemoryStream[String]

val result = inputData
  .toDF()
  .as[String]
  .groupByKey { v => v }
  .flatMapGroupsWithState(OutputMode.Append(), 
GroupStateTimeout.ProcessingTimeTimeout())(stateFunc)

val query = result
  .writeStream
  .format("memory")
  .option("queryName", "test")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("5 second"))
  .start()

Thread.sleep(1000)

var chIdx: Long = 0

while (true) {
  (chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) }
  chIdx += 5
  // intentionally sleep much more than trigger to enable "empty" batch
  Thread.sleep(10 * 1000)
}
  }
}
```

> before the patch (batch 3 which was an "empty" batch)

```
{
   "id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e",
   "runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b",
   "name":"test",
   "timestamp":"2019-11-18T07:00:25.005Z",
   "batchId":3,
   "numInputRows":0,
   "inputRowsPerSecond":0.0,
   "processedRowsPerSecond":0.0,
   "durationMs":{
  "addBatch":1664,
  "getBatch":0,
  "latestOffset":0,
  "queryPlanning":29,
  "triggerExecution":1789,
  "walCommit":51
   },
   "stateOperators":[
  {
 "numRowsTotal":10,
 "numRowsUpdated":0,

[spark] branch master updated: [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run

2020-04-08 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ca2ba4f  [SPARK-29314][SS] Don't overwrite the metric "updated" of 
state operator to 0 if empty batch is run
ca2ba4f is described below

commit ca2ba4fe647cd60668410b68014a3991ad7fd5c9
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Wed Apr 8 16:59:39 2020 -0700

[SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 
0 if empty batch is run

### What changes were proposed in this pull request?

This patch fixes the behavior of ProgressReporter which always overwrite 
the value of "updated" of state operator to 0 if there's no new data. The 
behavior is correct only when we copy the state progress from "previous" 
executed plan, meaning no batch has been run. (Nonzero value of "updated" would 
be odd if batch didn't run, so it was correct.)

It was safe to assume no data is no batch, but SPARK-24156 enables empty 
data can run the batch if Spark needs to deal with watermark. After the patch, 
it only overwrites the value if both two conditions are met: 1) no data 2) no 
batch.

### Why are the changes needed?

Currently Spark doesn't reflect correct metrics when empty batch is run and 
this patch fixes it.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Modified UT. Note that FlatMapGroupsWithState increases the value of 
"updated" when state rows are removed.
Also manually tested via below query (not a simple query to test with 
spark-shell, as you'll meet closure issue in spark-shell while playing with 
state func):

> query

```
case class RunningCount(count: Long)

object TestFlatMapGroupsWithState {
  def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession

val ss = SparkSession
  .builder()
  .appName("TestFlatMapGroupsWithState")
  .getOrCreate()

ss.conf.set("spark.sql.shuffle.partitions", "5")

import ss.implicits._

val stateFunc = (key: String, values: Iterator[String], state: 
GroupState[RunningCount]) => {
  if (state.hasTimedOut) {
// End users are not restricted to remove the state here - they can 
update the
// state as well. For example, event time session window would have 
list of
// sessions here and it cannot remove entire state.
state.update(RunningCount(-1))
Iterator((key, "-1"))
  } else {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
state.update(RunningCount(count))
state.setTimeoutDuration("1 seconds")
Iterator((key, count.toString))
  }
}

implicit val sqlContext = ss.sqlContext
val inputData = MemoryStream[String]

val result = inputData
  .toDF()
  .as[String]
  .groupByKey { v => v }
  .flatMapGroupsWithState(OutputMode.Append(), 
GroupStateTimeout.ProcessingTimeTimeout())(stateFunc)

val query = result
  .writeStream
  .format("memory")
  .option("queryName", "test")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("5 second"))
  .start()

Thread.sleep(1000)

var chIdx: Long = 0

while (true) {
  (chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) }
  chIdx += 5
  // intentionally sleep much more than trigger to enable "empty" batch
  Thread.sleep(10 * 1000)
}
  }
}
```

> before the patch (batch 3 which was an "empty" batch)

```
{
   "id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e",
   "runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b",
   "name":"test",
   "timestamp":"2019-11-18T07:00:25.005Z",
   "batchId":3,
   "numInputRows":0,
   "inputRowsPerSecond":0.0,
   "processedRowsPerSecond":0.0,
   "durationMs":{
  "addBatch":1664,
  "getBatch":0,
  "latestOffset":0,
  "queryPlanning":29,
  "triggerExecution":1789,
  "walCommit":51
   },
   "stateOperators":[
  {
 "numRowsTotal":10,
 "numRowsUpdated":0,

[spark] branch branch-3.0 updated: [SPARK-31278][SS] Fix StreamingQuery output rows metric

2020-04-07 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new a856eea  [SPARK-31278][SS] Fix StreamingQuery output rows metric
a856eea is described below

commit a856eea42949810f54c5f2f41b9c9abdd2da37c6
Author: Burak Yavuz 
AuthorDate: Tue Apr 7 17:17:47 2020 -0700

[SPARK-31278][SS] Fix StreamingQuery output rows metric

### What changes were proposed in this pull request?

In Structured Streaming, we provide progress updates every 10 seconds when 
a stream doesn't have any new data upstream. When providing this progress 
though, we zero out the input information but not the output information. This 
PR fixes that bug.

### Why are the changes needed?

Fixes a bug around incorrect metrics

### Does this PR introduce any user-facing change?

Fixes a bug in the metrics

### How was this patch tested?

New regression test

Closes #28040 from brkyvz/sinkMetrics.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
(cherry picked from commit 8ab2a0c5f23a59c00a9b4191afd976af50d913ba)
Signed-off-by: Burak Yavuz 
---
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala |  2 +-
 .../execution/streaming/MicroBatchExecution.scala  |  3 +-
 .../sql/execution/streaming/ProgressReporter.scala | 32 ++
 .../sql/streaming/StreamingAggregationSuite.scala  | 71 ++
 .../streaming/StreamingDeduplicationSuite.scala|  3 +-
 .../StreamingQueryStatusAndProgressSuite.scala |  1 +
 6 files changed, 73 insertions(+), 39 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 5c8c5b1..4e808a5 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends 
KafkaSinkStreamingSuiteBase {
 try {
   input.addData("1", "2", "3")
   verifyResult(writer) {
-assert(writer.lastProgress.sink.numOutputRows == 3L)
+assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L))
   }
 } finally {
   writer.stop()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 45a2ce1..e022bfb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -226,7 +226,8 @@ class MicroBatchExecution(
   }
 }
 
-finishTrigger(currentBatchHasNewData)  // Must be outside 
reportTimeTaken so it is recorded
+// Must be outside reportTimeTaken so it is recorded
+finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed)
 
 // Signal waiting threads. Note this must be after finishTrigger() to 
ensure all
 // activities (progress generation, etc.) have completed before 
signaling.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index feb151a..d1086cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -85,8 +85,8 @@ trait ProgressReporter extends Logging {
   private val noDataProgressEventInterval =
 sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
 
-  // The timestamp we report an event that has no input data
-  private var lastNoDataProgressEventTime = Long.MinValue
+  // The timestamp we report an event that has not executed anything
+  private var lastNoExecutionProgressEventTime = Long.MinValue
 
   private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
   timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -142,8 +142,15 @@ trait ProgressReporter extends Logging {
 logInfo(s"Streaming query made progress: $newProgress")
   }
 
-  /** Finalizes the query progress and adds it to list of recent status 
updates. */
-  protected def finishTrigger(hasNewData: Boolean): Unit = {
+  /**
+   * Finalizes the query progress and adds it to list of recent status updates.
+   *
+   * @param hasNewData Whether the sources of this str

[spark] branch master updated: [SPARK-31278][SS] Fix StreamingQuery output rows metric

2020-04-07 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8ab2a0c  [SPARK-31278][SS] Fix StreamingQuery output rows metric
8ab2a0c is described below

commit 8ab2a0c5f23a59c00a9b4191afd976af50d913ba
Author: Burak Yavuz 
AuthorDate: Tue Apr 7 17:17:47 2020 -0700

[SPARK-31278][SS] Fix StreamingQuery output rows metric

### What changes were proposed in this pull request?

In Structured Streaming, we provide progress updates every 10 seconds when 
a stream doesn't have any new data upstream. When providing this progress 
though, we zero out the input information but not the output information. This 
PR fixes that bug.

### Why are the changes needed?

Fixes a bug around incorrect metrics

### Does this PR introduce any user-facing change?

Fixes a bug in the metrics

### How was this patch tested?

New regression test

Closes #28040 from brkyvz/sinkMetrics.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala |  2 +-
 .../execution/streaming/MicroBatchExecution.scala  |  3 +-
 .../sql/execution/streaming/ProgressReporter.scala | 32 ++
 .../sql/streaming/StreamingAggregationSuite.scala  | 71 ++
 .../streaming/StreamingDeduplicationSuite.scala|  3 +-
 .../StreamingQueryStatusAndProgressSuite.scala |  1 +
 6 files changed, 73 insertions(+), 39 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 5c8c5b1..4e808a5 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends 
KafkaSinkStreamingSuiteBase {
 try {
   input.addData("1", "2", "3")
   verifyResult(writer) {
-assert(writer.lastProgress.sink.numOutputRows == 3L)
+assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L))
   }
 } finally {
   writer.stop()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 45a2ce1..e022bfb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -226,7 +226,8 @@ class MicroBatchExecution(
   }
 }
 
-finishTrigger(currentBatchHasNewData)  // Must be outside 
reportTimeTaken so it is recorded
+// Must be outside reportTimeTaken so it is recorded
+finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed)
 
 // Signal waiting threads. Note this must be after finishTrigger() to 
ensure all
 // activities (progress generation, etc.) have completed before 
signaling.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index feb151a..d1086cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -85,8 +85,8 @@ trait ProgressReporter extends Logging {
   private val noDataProgressEventInterval =
 sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
 
-  // The timestamp we report an event that has no input data
-  private var lastNoDataProgressEventTime = Long.MinValue
+  // The timestamp we report an event that has not executed anything
+  private var lastNoExecutionProgressEventTime = Long.MinValue
 
   private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
   timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -142,8 +142,15 @@ trait ProgressReporter extends Logging {
 logInfo(s"Streaming query made progress: $newProgress")
   }
 
-  /** Finalizes the query progress and adds it to list of recent status 
updates. */
-  protected def finishTrigger(hasNewData: Boolean): Unit = {
+  /**
+   * Finalizes the query progress and adds it to list of recent status updates.
+   *
+   * @param hasNewData Whether the sources of this stream had new data for 
this trigger.
+   * @param hasExecuted Whether any batch was executed during this trigg

[spark] branch branch-3.0 updated: [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times

2020-03-18 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new a97117f  [SPARK-31178][SQL] Prevent V2 exec nodes from executing 
multiple times
a97117f is described below

commit a97117f1294d3625d71809f2770523ad0e14ade0
Author: Burak Yavuz 
AuthorDate: Wed Mar 18 18:07:24 2020 -0700

[SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times

### What changes were proposed in this pull request?

This PR prevents the execution of V2 DataSource exec nodes multiple times 
when `collect()` is called on them. For V1 DataSources, commands would be 
executed as a RunnableCommand, which would cache the result as part of the 
`ExecutedCommandExec` node. We extend `V2CommandExec` for all the data writing 
commands so that they only get executed once as well.

### Why are the changes needed?

Calling `collect()` on a SQL command that inserts data or creates a table 
gets executed multiple times otherwise.

### Does this PR introduce any user-facing change?

Fixes a bug

### How was this patch tested?

Unit tests

Closes #27941 from brkyvz/doubleInsert.

Authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
(cherry picked from commit 4237251861c79f3176de7cf5232f0388ec5d946e)
Signed-off-by: Burak Yavuz 
---
 .../datasources/v2/ShowNamespacesExec.scala|  4 +-
 .../execution/datasources/v2/ShowTablesExec.scala  |  4 +-
 .../datasources/v2/V1FallbackWriters.scala | 10 ++--
 .../execution/datasources/v2/V2CommandExec.scala   |  6 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 28 -
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 68 ++
 .../spark/sql/connector/InsertIntoTests.scala  | 17 ++
 7 files changed, 112 insertions(+), 25 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
index fe3ab80..6f96848 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericRowWithSchem
 import org.apache.spark.sql.catalyst.util.StringUtils
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces
+import org.apache.spark.sql.execution.LeafExecNode
 
 /**
  * Physical plan node for showing namespaces.
@@ -33,8 +34,7 @@ case class ShowNamespacesExec(
 output: Seq[Attribute],
 catalog: SupportsNamespaces,
 namespace: Seq[String],
-pattern: Option[String])
-extends V2CommandExec {
+pattern: Option[String]) extends V2CommandExec with LeafExecNode {
 
   override protected def run(): Seq[InternalRow] = {
 val namespaces = if (namespace.nonEmpty) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
index 995b008..c740e0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericRowWithSchem
 import org.apache.spark.sql.catalyst.util.StringUtils
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
 import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.execution.LeafExecNode
 
 /**
  * Physical plan node for showing tables.
@@ -33,8 +34,7 @@ case class ShowTablesExec(
 output: Seq[Attribute],
 catalog: TableCatalog,
 namespace: Seq[String],
-pattern: Option[String])
-extends V2CommandExec {
+pattern: Option[String]) extends V2CommandExec with LeafExecNode {
   override protected def run(): Seq[InternalRow] = {
 val rows = new ArrayBuffer[InternalRow]()
 val encoder = RowEncoder(schema).resolveAndBind()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index f973000..7502a87 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -41,7 +41,7 @@ case class AppendDataExecV1(
 writeOptions

[spark] branch master updated: [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times

2020-03-18 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4237251  [SPARK-31178][SQL] Prevent V2 exec nodes from executing 
multiple times
4237251 is described below

commit 4237251861c79f3176de7cf5232f0388ec5d946e
Author: Burak Yavuz 
AuthorDate: Wed Mar 18 18:07:24 2020 -0700

[SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times

### What changes were proposed in this pull request?

This PR prevents the execution of V2 DataSource exec nodes multiple times 
when `collect()` is called on them. For V1 DataSources, commands would be 
executed as a RunnableCommand, which would cache the result as part of the 
`ExecutedCommandExec` node. We extend `V2CommandExec` for all the data writing 
commands so that they only get executed once as well.

### Why are the changes needed?

Calling `collect()` on a SQL command that inserts data or creates a table 
gets executed multiple times otherwise.

### Does this PR introduce any user-facing change?

Fixes a bug

### How was this patch tested?

Unit tests

Closes #27941 from brkyvz/doubleInsert.

Authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../datasources/v2/ShowNamespacesExec.scala|  4 +-
 .../execution/datasources/v2/ShowTablesExec.scala  |  4 +-
 .../datasources/v2/V1FallbackWriters.scala | 10 ++--
 .../execution/datasources/v2/V2CommandExec.scala   |  6 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 28 -
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 68 ++
 .../spark/sql/connector/InsertIntoTests.scala  | 17 ++
 7 files changed, 112 insertions(+), 25 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
index fe3ab80..6f96848 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericRowWithSchem
 import org.apache.spark.sql.catalyst.util.StringUtils
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces
+import org.apache.spark.sql.execution.LeafExecNode
 
 /**
  * Physical plan node for showing namespaces.
@@ -33,8 +34,7 @@ case class ShowNamespacesExec(
 output: Seq[Attribute],
 catalog: SupportsNamespaces,
 namespace: Seq[String],
-pattern: Option[String])
-extends V2CommandExec {
+pattern: Option[String]) extends V2CommandExec with LeafExecNode {
 
   override protected def run(): Seq[InternalRow] = {
 val namespaces = if (namespace.nonEmpty) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
index 995b008..c740e0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericRowWithSchem
 import org.apache.spark.sql.catalyst.util.StringUtils
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
 import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.execution.LeafExecNode
 
 /**
  * Physical plan node for showing tables.
@@ -33,8 +34,7 @@ case class ShowTablesExec(
 output: Seq[Attribute],
 catalog: TableCatalog,
 namespace: Seq[String],
-pattern: Option[String])
-extends V2CommandExec {
+pattern: Option[String]) extends V2CommandExec with LeafExecNode {
   override protected def run(): Seq[InternalRow] = {
 val rows = new ArrayBuffer[InternalRow]()
 val encoder = RowEncoder(schema).resolveAndBind()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index f973000..7502a87 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -41,7 +41,7 @@ case class AppendDataExecV1(
 writeOptions: CaseInsensitiveStringMap,
 plan: LogicalPlan) extends V1FallbackWriters {
 
-  override protected def

[spark] branch master updated: [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming

2020-01-30 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1cd19ad  [SPARK-30669][SS] Introduce AdmissionControl APIs for 
StructuredStreaming
1cd19ad is described below

commit 1cd19ad92da960f18a6673bc3ce670ce633050e5
Author: Burak Yavuz 
AuthorDate: Thu Jan 30 22:02:48 2020 -0800

[SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming

### What changes were proposed in this pull request?

We propose to add a new interface `SupportsAdmissionControl` and 
`ReadLimit`. A ReadLimit defines how much data should be read in the next 
micro-batch. `SupportsAdmissionControl` specifies that a source can rate limit 
its ingest into the system. The source can tell the system what the user 
specified as a read limit, and the system can enforce this limit within each 
micro-batch or impose its own limit if the Trigger is Trigger.Once() for 
example.

We then use this interface in FileStreamSource, KafkaSource, and 
KafkaMicroBatchStream.

### Why are the changes needed?

Sources currently have no information around execution semantics such as 
whether the stream is being executed in Trigger.Once() mode. This interface 
will pass this information into the sources as part of planning. With a trigger 
like Trigger.Once(), the semantics are to process all the data available to the 
datasource in a single micro-batch. However, this semantic can be broken when 
data source options such as `maxOffsetsPerTrigger` (in the Kafka source) rate 
limit the amount of data [...]

### Does this PR introduce any user-facing change?

DataSource developers can extend this interface for their streaming sources 
to add admission control into their system and correctly support Trigger.Once().

### How was this patch tested?

Existing tests, as this API is mostly internal

Closes #27380 from brkyvz/rateLimit.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 25 ++
 .../apache/spark/sql/kafka010/KafkaSource.scala| 29 +++
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 22 +
 .../read/streaming/ReadAllAvailable.java}  | 28 +++
 .../sql/connector/read/streaming/ReadLimit.java}   | 25 ++
 .../sql/connector/read/streaming/ReadMaxFiles.java | 55 +
 .../sql/connector/read/streaming/ReadMaxRows.java  | 55 +
 .../read/streaming/SupportsAdmissionControl.java   | 56 ++
 .../sql/execution/streaming/FileStreamSource.scala | 25 --
 .../execution/streaming/MicroBatchExecution.scala  | 49 +--
 .../sql/execution/streaming/StreamExecution.scala  |  6 +--
 .../streaming/continuous/ContinuousExecution.scala |  4 +-
 .../sql/streaming/FileStreamSourceSuite.scala  | 56 ++
 13 files changed, 376 insertions(+), 59 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 844c963..6599e7e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -27,8 +27,7 @@ import 
org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
-import 
org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, 
Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.UninterruptibleThread
@@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream(
 options: CaseInsensitiveStringMap,
 metadataPath: String,
 startingOffsets: KafkaOffsetRangeLimit,
-failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
+failOnDataLoss: Boolean) extends SupportsAdmissionControl with 
MicroBatchStream with Logging {
 
   private[kafka010] val pollTimeoutMs = options.getLong(
 KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
@@ -77,13 +76,23 @@ private[kafka010] class KafkaMicroBatchStream(
 KafkaSourceOffset(getOrCreateInitialPartitionOffsets

[spark] branch master updated: [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming

2020-01-30 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 940510c  [SPARK-30669][SS] Introduce AdmissionControl APIs for 
StructuredStreaming
940510c is described below

commit 940510cb1e43a4166d2fe7d7eb4ace8561d24c9b
Author: Burak Yavuz 
AuthorDate: Thu Jan 30 22:01:53 2020 -0800

[SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming

### What changes were proposed in this pull request?

We propose to add a new interface `SupportsAdmissionControl` and 
`ReadLimit`. A ReadLimit defines how much data should be read in the next 
micro-batch. `SupportsAdmissionControl` specifies that a source can rate limit 
its ingest into the system. The source can tell the system what the user 
specified as a read limit, and the system can enforce this limit within each 
micro-batch or impose its own limit if the Trigger is Trigger.Once() for 
example.

We then use this interface in FileStreamSource, KafkaSource, and 
KafkaMicroBatchStream.

### Why are the changes needed?

Sources currently have no information around execution semantics such as 
whether the stream is being executed in Trigger.Once() mode. This interface 
will pass this information into the sources as part of planning. With a trigger 
like Trigger.Once(), the semantics are to process all the data available to the 
datasource in a single micro-batch. However, this semantic can be broken when 
data source options such as `maxOffsetsPerTrigger` (in the Kafka source) rate 
limit the amount of data [...]

### Does this PR introduce any user-facing change?

DataSource developers can extend this interface for their streaming sources 
to add admission control into their system and correctly support Trigger.Once().

### How was this patch tested?

Existing tests, as this API is mostly internal

Closes #27380 from brkyvz/rateLimit.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 25 ++
 .../apache/spark/sql/kafka010/KafkaSource.scala| 29 +++
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 22 +
 .../read/streaming/ReadAllAvailable.java}  | 28 +++
 .../sql/connector/read/streaming/ReadLimit.java}   | 25 ++
 .../sql/connector/read/streaming/ReadMaxFiles.java | 55 +
 .../sql/connector/read/streaming/ReadMaxRows.java  | 55 +
 .../read/streaming/SupportsAdmissionControl.java   | 56 ++
 .../sql/execution/streaming/FileStreamSource.scala | 25 --
 .../execution/streaming/MicroBatchExecution.scala  | 49 +--
 .../sql/execution/streaming/StreamExecution.scala  |  6 +--
 .../streaming/continuous/ContinuousExecution.scala |  4 +-
 .../sql/streaming/FileStreamSourceSuite.scala  | 56 ++
 13 files changed, 376 insertions(+), 59 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 844c963..6599e7e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -27,8 +27,7 @@ import 
org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
-import 
org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, 
Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.UninterruptibleThread
@@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream(
 options: CaseInsensitiveStringMap,
 metadataPath: String,
 startingOffsets: KafkaOffsetRangeLimit,
-failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
+failOnDataLoss: Boolean) extends SupportsAdmissionControl with 
MicroBatchStream with Logging {
 
   private[kafka010] val pollTimeoutMs = options.getLong(
 KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
@@ -77,13 +76,23 @@ private[kafka010] class KafkaMicroBatchStream(
 KafkaSourceOffset(getOrCreateInitialPartitionOffsets

[spark] branch master updated: [SPARK-30314] Add identifier and catalog information to DataSourceV2Relation

2020-01-26 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d0800fc  [SPARK-30314] Add identifier and catalog information to 
DataSourceV2Relation
d0800fc is described below

commit d0800fc8e2e71a79bf0f72c3e4bc608ae34053e7
Author: Yuchen Huo 
AuthorDate: Sun Jan 26 12:59:24 2020 -0800

[SPARK-30314] Add identifier and catalog information to DataSourceV2Relation

### What changes were proposed in this pull request?

Add identifier and catalog information in DataSourceV2Relation so it would 
be possible to do richer checks in checkAnalysis step.

### Why are the changes needed?

In data source v2, table implementations are all customized so we may not 
be able to get the resolved identifier from tables them selves. Therefore we 
encode the table and catalog information in DSV2Relation so no external changes 
are needed to make sure this information is available.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Unit tests in the following suites:
CatalogManagerSuite.scala
CatalogV2UtilSuite.scala
SupportsCatalogOptionsSuite.scala
PlanResolutionSuite.scala

Closes #26957 from yuchenhuo/SPARK-30314.

Authored-by: Yuchen Huo 
Signed-off-by: Burak Yavuz 
---
 .../spark/sql/kafka010/KafkaRelationSuite.scala|  2 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala |  9 +--
 .../sql/connector/catalog/CatalogV2Util.scala  |  2 +-
 .../datasources/v2/DataSourceV2Relation.scala  | 21 --
 .../sql/connector/catalog/CatalogV2UtilSuite.scala | 40 
 .../org/apache/spark/sql/DataFrameReader.scala | 16 +++--
 .../org/apache/spark/sql/DataFrameWriter.scala | 22 +++
 .../org/apache/spark/sql/DataFrameWriterV2.scala   | 10 ++-
 .../apache/spark/sql/execution/CacheManager.scala  |  2 +-
 .../datasources/FallBackFileSourceV2.scala |  3 +-
 .../datasources/v2/DataSourceV2Strategy.scala  |  8 +--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 43 +
 .../connector/SupportsCatalogOptionsSuite.scala| 75 +-
 .../sql/connector/TableCapabilityCheckSuite.scala  | 32 +
 .../execution/command/PlanResolutionSuite.scala| 55 +++-
 .../parquet/ParquetPartitionDiscoverySuite.scala   |  2 +-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  |  2 +-
 17 files changed, 290 insertions(+), 54 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 063e2e2..2c022c1 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -624,7 +624,7 @@ class KafkaRelationSuiteV2 extends KafkaRelationSuiteBase {
 val topic = newTopic()
 val df = createDF(topic)
 assert(df.logicalPlan.collect {
-  case DataSourceV2Relation(_, _, _) => true
+  case _: DataSourceV2Relation => true
 }.nonEmpty)
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 45547bf..15ebf69 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -817,8 +817,8 @@ class Analyzer(
 
   case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
 CatalogV2Util.loadRelation(u.catalog, u.tableName)
-.map(rel => alter.copy(table = rel))
-.getOrElse(alter)
+  .map(rel => alter.copy(table = rel))
+  .getOrElse(alter)
 
   case u: UnresolvedV2Relation =>
 CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
@@ -831,7 +831,8 @@ class Analyzer(
   expandRelationName(identifier) match {
 case NonSessionCatalogAndIdentifier(catalog, ident) =>
   CatalogV2Util.loadTable(catalog, ident) match {
-case Some(table) => Some(DataSourceV2Relation.create(table))
+case Some(table) =>
+  Some(DataSourceV2Relation.create(table, Some(catalog), 
Some(ident)))
 case None => None
   }
 case _ => None
@@ -921,7 +922,7 @@ class Analyzer(
 case v1Table: V1Table =>
   v1SessionCatalog.getRelation(v1Table.v1Table)
 case table =>
-  DataSourceV2Relation.create(table)
+  DataSourceV2Relation.create(table, Some(catalog), S

[spark] branch master updated: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider

2020-01-09 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f8d5957  [SPARK-29219][SQL] Introduce SupportsCatalogOptions for 
TableProvider
f8d5957 is described below

commit f8d59572b014e5254b0c574b26e101c2e4157bdd
Author: Burak Yavuz 
AuthorDate: Thu Jan 9 11:18:16 2020 -0800

[SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider

### What changes were proposed in this pull request?

This PR introduces `SupportsCatalogOptions` as an interface for 
`TableProvider`. Through `SupportsCatalogOptions`, V2 DataSources can implement 
the two methods `extractIdentifier` and `extractCatalog` to support the 
creation, and existence check of tables without requiring a formal TableCatalog 
implementation.

We currently don't support all SaveModes for DataSourceV2 in 
DataFrameWriter.save. The idea here is that eventually File based tables can be 
written with `DataFrameWriter.save(path)` will create a PathIdentifier where 
the name is `path`, and the V2SessionCatalog will be able to perform FileSystem 
checks at `path` to support ErrorIfExists and Ignore SaveModes.

### Why are the changes needed?

To support all Save modes for V2 data sources with DataFrameWriter. Since 
we can now support table creation, we will be able to provide partitioning 
information when first creating the table as well.

### Does this PR introduce any user-facing change?

Introduces a new interface

### How was this patch tested?

Will add tests once interface is vetted.

Closes #26913 from brkyvz/catalogOptions.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala |  13 +-
 .../connector/catalog/SupportsCatalogOptions.java  |  53 +
 .../sql/connector/catalog/CatalogV2Util.scala  |  11 ++
 .../org/apache/spark/sql/DataFrameReader.scala |  21 +-
 .../org/apache/spark/sql/DataFrameWriter.scala | 128 
 .../connector/SupportsCatalogOptionsSuite.scala| 219 +
 .../sql/connector/TestV2SessionCatalogBase.scala   |   5 +
 7 files changed, 406 insertions(+), 44 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index e2dcd62..5c8c5b1 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.reflect.ClassTag
+import scala.util.Try
 
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner
@@ -500,7 +501,7 @@ abstract class KafkaSinkBatchSuiteBase extends 
KafkaSinkSuiteBase {
 TestUtils.assertExceptionMsg(ex, "null topic present in the data")
   }
 
-  protected def testUnsupportedSaveModes(msg: (SaveMode) => String): Unit = {
+  protected def testUnsupportedSaveModes(msg: (SaveMode) => Seq[String]): Unit 
= {
 val topic = newTopic()
 testUtils.createTopic(topic)
 val df = Seq[(String, String)](null.asInstanceOf[String] -> 
"1").toDF("topic", "value")
@@ -513,7 +514,10 @@ abstract class KafkaSinkBatchSuiteBase extends 
KafkaSinkSuiteBase {
   .mode(mode)
   .save()
   }
-  TestUtils.assertExceptionMsg(ex, msg(mode))
+  val errorChecks = msg(mode).map(m => 
Try(TestUtils.assertExceptionMsg(ex, m)))
+  if (!errorChecks.exists(_.isSuccess)) {
+fail("Error messages not found in exception trace")
+  }
 }
   }
 
@@ -541,7 +545,7 @@ class KafkaSinkBatchSuiteV1 extends KafkaSinkBatchSuiteBase 
{
   .set(SQLConf.USE_V1_SOURCE_LIST, "kafka")
 
   test("batch - unsupported save modes") {
-testUnsupportedSaveModes((mode) => s"Save mode ${mode.name} not allowed 
for Kafka")
+testUnsupportedSaveModes((mode) => s"Save mode ${mode.name} not allowed 
for Kafka" :: Nil)
   }
 }
 
@@ -552,7 +556,8 @@ class KafkaSinkBatchSuiteV2 extends KafkaSinkBatchSuiteBase 
{
   .set(SQLConf.USE_V1_SOURCE_LIST, "")
 
   test("batch - unsupported save modes") {
-testUnsupportedSaveModes((mode) => s"cannot be written with ${mode.name} 
mode")
+testUnsupportedSaveModes((mode) =>
+  Seq(s"cannot be written with ${mode.name} mode", "does not support 
truncate"))
   }
 
   test("generic - write

[spark] branch master updated: [SPARK-30143][SS] Add a timeout on stopping a streaming query

2019-12-13 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4c37a8a  [SPARK-30143][SS] Add a timeout on stopping a streaming query
4c37a8a is described below

commit 4c37a8a3f4a489b52f1919d2db84f6e32c6a05cd
Author: Burak Yavuz 
AuthorDate: Fri Dec 13 15:16:00 2019 -0800

[SPARK-30143][SS] Add a timeout on stopping a streaming query

### What changes were proposed in this pull request?

Add a timeout configuration for StreamingQuery.stop()

### Why are the changes needed?

The stop() method on a Streaming Query awaits the termination of the stream 
execution thread. However, the stream execution thread may block forever 
depending on the streaming source implementation (like in Kafka, which runs 
UninterruptibleThreads).

This causes control flow applications to hang indefinitely as well. We'd 
like to introduce a timeout to stop the execution thread, so that the control 
flow thread can decide to do an action if a timeout is hit.

### Does this PR introduce any user-facing change?

By default, no. If the timeout configuration is set, then a 
TimeoutException will be thrown if a stream cannot be stopped within the given 
timeout.

### How was this patch tested?

Unit tests

Closes #26771 from brkyvz/stopTimeout.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|   7 ++
 .../execution/streaming/MicroBatchExecution.scala  |   3 +-
 .../sql/execution/streaming/StreamExecution.scala  |  26 +++-
 .../streaming/continuous/ContinuousExecution.scala |   3 +-
 .../spark/sql/streaming/DataStreamWriter.scala |  11 +-
 .../spark/sql/streaming/StreamingQuery.scala   |  12 +-
 .../sql/streaming/StreamingQueryManager.scala  |   3 +-
 .../streaming/JavaDataStreamReaderWriterSuite.java |   5 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |  35 +-
 .../sql/streaming/util/BlockOnStopSource.scala | 132 +
 10 files changed, 224 insertions(+), 13 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c54008c..91347cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1298,6 +1298,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(10L)
 
+  val STREAMING_STOP_TIMEOUT =
+buildConf("spark.sql.streaming.stopTimeout")
+  .doc("How long to wait for the streaming execution thread to stop when 
calling the " +
+"streaming query's stop() method in milliseconds. 0 or negative values 
wait indefinitely.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefault(0L)
+
   val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
 buildConf("spark.sql.streaming.noDataProgressEventInterval")
   .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 5fe1f92..872c367 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -150,8 +150,7 @@ class MicroBatchExecution(
 state.set(TERMINATED)
 if (queryExecutionThread.isAlive) {
   sparkSession.sparkContext.cancelJobGroup(runId.toString)
-  queryExecutionThread.interrupt()
-  queryExecutionThread.join()
+  interruptAndAwaitExecutionThreadTermination()
   // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
   sparkSession.sparkContext.cancelJobGroup(runId.toString)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f470ad3..1cb3955 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.io.{InterruptedIOException, IOException, UncheckedIOException}
 import java.nio.channels.ClosedByInterruptException
 import java.util.UUID
-import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionExcepti

[spark] branch master updated: [SPARK-29568][SS] Stop existing running streams when a new stream is launched

2019-11-13 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 363af16  [SPARK-29568][SS] Stop existing running streams when a new 
stream is launched
363af16 is described below

commit 363af16c72abe19fc5cc5b5bdf9d8dc34975f2ba
Author: Burak Yavuz 
AuthorDate: Wed Nov 13 08:59:46 2019 -0800

[SPARK-29568][SS] Stop existing running streams when a new stream is 
launched

### What changes were proposed in this pull request?

This PR adds a SQL Conf: `spark.sql.streaming.stopActiveRunOnRestart`. When 
this conf is `true` (by default it is), an already running stream will be 
stopped, if a new copy gets launched on the same checkpoint location.

### Why are the changes needed?

In multi-tenant environments where you have multiple SparkSessions, you can 
accidentally start multiple copies of the same stream (i.e. streams using the 
same checkpoint location). This will cause all new instantiations of the new 
stream to fail. However, sometimes you may want to turn off the old stream, as 
the old stream may have turned into a zombie (you no longer have access to the 
query handle or SparkSession).

It would be nice to have a SQL flag that allows the stopping of the old 
stream for such zombie cases.

### Does this PR introduce any user-facing change?

Yes. Now by default, if you launch a new copy of an already running stream 
on a multi-tenant cluster, the existing stream will be stopped.

### How was this patch tested?

Unit tests in StreamingQueryManagerSuite

Closes #26225 from brkyvz/stopStream.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|   9 ++
 .../apache/spark/sql/internal/SharedState.scala|  10 +-
 .../sql/streaming/StreamingQueryManager.scala  |  82 +
 .../sql/streaming/StreamingQueryManagerSuite.scala | 134 -
 .../spark/sql/streaming/StreamingQuerySuite.scala  |   8 +-
 5 files changed, 184 insertions(+), 59 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 98acace..759586a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1093,6 +1093,15 @@ object SQLConf {
   .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
   .createWithDefault(2)
 
+  val STREAMING_STOP_ACTIVE_RUN_ON_RESTART =
+buildConf("spark.sql.streaming.stopActiveRunOnRestart")
+.doc("Running multiple runs of the same streaming query concurrently is 
not supported. " +
+  "If we find a concurrent active run for a streaming query (in the same 
or different " +
+  "SparkSessions on the same cluster) and this flag is true, we will stop 
the old streaming " +
+  "query run to start the new one.")
+.booleanConf
+.createWithDefault(true)
+
   val STREAMING_JOIN_STATE_FORMAT_VERSION =
 buildConf("spark.sql.streaming.join.stateFormatVersion")
   .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index d097f9f..b810bed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
 import java.net.URL
 import java.util.{Locale, UUID}
 import java.util.concurrent.ConcurrentHashMap
+import javax.annotation.concurrent.GuardedBy
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -32,9 +33,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.CacheManager
+import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, 
SQLAppStatusStore, SQLTab}
 import org.apache.spark.sql.internal.StaticSQLConf._
-import org.apache.spark.sql.streaming.StreamingQueryManager
+import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.util.Utils
 
@@ -112,11 +114,15 @@ private[sql] class SharedState(
*/
   val cacheManager: CacheManager = new CacheManager
 
+  /** A global lock for all streaming query lifecycle tracking and management. 
*/
+  private[sql] val activeQueriesLock = new Object
+
   /**
* A map of active stre

[spark] branch master updated: [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState

2019-10-23 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cbe6ead  [SPARK-29352][SQL][SS] Track active streaming queries in the 
SparkSession.sharedState
cbe6ead is described below

commit cbe6eadc0c1d0384c1ee03f3a5b28cc583a60717
Author: Burak Yavuz 
AuthorDate: Wed Oct 23 10:56:19 2019 +0200

[SPARK-29352][SQL][SS] Track active streaming queries in the 
SparkSession.sharedState

### What changes were proposed in this pull request?

This moves the tracking of active queries from a per SparkSession state, to 
the shared SparkSession for better safety in isolated Spark Session 
environments.

### Why are the changes needed?

We have checks to prevent the restarting of the same stream on the same 
spark session, but we can actually make that better in multi-tenant 
environments by actually putting that state in the SharedState instead of 
SessionState. This would allow a more comprehensive check for multi-tenant 
clusters.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added tests to StreamingQueryManagerSuite

Closes #26018 from brkyvz/sharedStreamingQueryManager.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../apache/spark/sql/internal/SharedState.scala| 10 ++-
 .../sql/streaming/StreamingQueryManager.scala  | 22 +++---
 .../sql/streaming/StreamingQueryManagerSuite.scala | 80 +-
 3 files changed, 102 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index f1a6481..d097f9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.internal
 
 import java.net.URL
-import java.util.Locale
+import java.util.{Locale, UUID}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, 
SQLAppStatusStore, SQLTab}
 import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.util.Utils
 
@@ -111,6 +113,12 @@ private[sql] class SharedState(
   val cacheManager: CacheManager = new CacheManager
 
   /**
+   * A map of active streaming queries to the session specific 
StreamingQueryManager that manages
+   * the lifecycle of that stream.
+   */
+  private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, 
StreamingQueryManager]()
+
+  /**
* A status store to query SQL status/metrics of this Spark application, 
based on SQL-specific
* [[org.apache.spark.scheduler.SparkListenerEvent]]s.
*/
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 9abe38d..9b43a83 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -352,8 +352,10 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
 }
   }
 
-  // Make sure no other query with same id is active
-  if (activeQueries.values.exists(_.id == query.id)) {
+  // Make sure no other query with same id is active across all sessions
+  val activeOption =
+
Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, 
this))
+  if (activeOption.isDefined || activeQueries.values.exists(_.id == 
query.id)) {
 throw new IllegalStateException(
   s"Cannot start query with id ${query.id} as another query with same 
id is " +
 s"already active. Perhaps you are attempting to restart a query 
from checkpoint " +
@@ -370,9 +372,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   query.streamingQuery.start()
 } catch {
   case e: Throwable =>
-activeQueriesLock.synchronized {
-  activeQueries -= query.id
-}
+unregisterTerminatedStream(query.id)
 throw e
 }
 query
@@ -380,9 +380,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
 
   /** Notify (by the StreamingQuery) that the qu

[spark] branch master updated: [SPARK-28612][SQL] Add DataFrameWriterV2 API

2019-09-19 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2c775f4  [SPARK-28612][SQL] Add DataFrameWriterV2 API
2c775f4 is described below

commit 2c775f418f5fae4dbf3adfbb5ea99cd030918d41
Author: Ryan Blue 
AuthorDate: Thu Sep 19 13:32:09 2019 -0700

[SPARK-28612][SQL] Add DataFrameWriterV2 API

## What changes were proposed in this pull request?

This adds a new write API as proposed in the [SPIP to standardize logical 
plans](https://issues.apache.org/jira/browse/SPARK-23521). This new API:

* Uses clear verbs to execute writes, like `append`, `overwrite`, `create`, 
and `replace` that correspond to the new logical plans.
* Only creates v2 logical plans so the behavior is always consistent.
* Does not allow table configuration options for operations that cannot 
change table configuration. For example, `partitionedBy` can only be called 
when the writer executes `create` or `replace`.

Here are a few example uses of the new API:

```scala
df.writeTo("catalog.db.table").append()
df.writeTo("catalog.db.table").overwrite($"date" === "2019-06-01")
df.writeTo("catalog.db.table").overwritePartitions()
df.writeTo("catalog.db.table").asParquet.create()
df.writeTo("catalog.db.table").partitionedBy(days($"ts")).createOrReplace()
df.writeTo("catalog.db.table").using("abc").replace()
```

## How was this patch tested?

Added `DataFrameWriterV2Suite` that tests the new write API. Existing tests 
for v2 plans.

Closes #25681 from rdblue/SPARK-28612-add-data-frame-writer-v2.

Authored-by: Ryan Blue 
Signed-off-by: Burak Yavuz 
---
 .../catalyst/expressions/PartitionTransforms.scala |  77 
 .../spark/sql/catalyst/analysis/Analyzer.scala |   6 +-
 .../plans/logical/basicLogicalOperators.scala  |  47 +-
 .../datasources/v2/DataSourceV2Implicits.scala |   9 +
 .../apache/spark/sql/connector/InMemoryTable.scala |   5 +-
 .../org/apache/spark/sql/DataFrameWriter.scala |  11 +-
 .../org/apache/spark/sql/DataFrameWriterV2.scala   | 367 +++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  28 ++
 .../datasources/v2/DataSourceV2Strategy.scala  |  17 +-
 .../datasources/v2/TableCapabilityCheck.scala  |   6 +-
 .../scala/org/apache/spark/sql/functions.scala |  58 +++
 .../spark/sql/JavaDataFrameWriterV2Suite.java  | 112 +
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 507 +
 13 files changed, 1217 insertions(+), 33 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
new file mode 100644
index 000..e48fd8a
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+/**
+ * Base class for expressions that are converted to v2 partition transforms.
+ *
+ * Subclasses represent abstract transform functions with concrete 
implementations that are
+ * determined by data source implementations. Because the concrete 
implementation is not known,
+ * these expressions are [[Unevaluable]].
+ *
+ * These expressions are used to pass transformations from the DataFrame API:
+ *
+ * {{{
+ *   df.writeTo("catalog.db.table").partitionedBy($"category", 
days($"timestamp")).create()
+ * }}}
+ */
+abstract class PartitionTransformExpression extends Expression with 
Unevaluable {
+  override def nullable: Boolean = true
+}
+
+/**
+ * Expression for the v2 partition transform years.
+ */
+case class Years(child: Expression) extends PartitionTransformExpression {

[spark] branch master updated: [SPARK-29030][SQL] Simplify lookupV2Relation

2019-09-18 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ee94b5d  [SPARK-29030][SQL] Simplify lookupV2Relation
ee94b5d is described below

commit ee94b5d7019f8ec181d42e953cb8b5190186fe30
Author: John Zhuge 
AuthorDate: Wed Sep 18 09:27:11 2019 -0700

[SPARK-29030][SQL] Simplify lookupV2Relation

## What changes were proposed in this pull request?

Simplify the return type for `lookupV2Relation` which makes the 3 callers 
more straightforward.

## How was this patch tested?

Existing unit tests.

Closes #25735 from jzhuge/lookupv2relation.

Authored-by: John Zhuge 
Signed-off-by: Burak Yavuz 
---
 .../spark/sql/catalyst/analysis/Analyzer.scala | 87 --
 .../sql/connector/catalog/CatalogV2Implicits.scala |  6 +-
 2 files changed, 37 insertions(+), 56 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a13a34..76e59fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.sql._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.catalyst.util.toPrettySQL
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
CatalogV2Util, Identifier, LookupCatalog, Table, TableCatalog, TableChange, 
V1Table}
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform, Transform}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -666,20 +666,13 @@ class Analyzer(
   object ResolveTables extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
   case u: UnresolvedRelation =>
-val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match {
-  case scala.Left((_, _, tableOpt)) => tableOpt
-  case scala.Right(tableOpt) => tableOpt
-}
-v2TableOpt.map(DataSourceV2Relation.create).getOrElse(u)
+lookupV2Relation(u.multipartIdentifier)
+  .getOrElse(u)
 
   case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if 
i.query.resolved =>
-val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match {
-  case scala.Left((_, _, tableOpt)) => tableOpt
-  case scala.Right(tableOpt) => tableOpt
-}
-v2TableOpt.map(DataSourceV2Relation.create).map { v2Relation =>
-  i.copy(table = v2Relation)
-}.getOrElse(i)
+lookupV2Relation(u.multipartIdentifier)
+  .map(v2Relation => i.copy(table = v2Relation))
+  .getOrElse(i)
 }
   }
 
@@ -963,26 +956,13 @@ class Analyzer(
 private def resolveV2Alter(
 tableName: Seq[String],
 changes: Seq[TableChange]): Option[AlterTable] = {
-  lookupV2Relation(tableName) match {
-case scala.Left((v2Catalog, ident, tableOpt)) =>
-  Some(AlterTable(
-v2Catalog.asTableCatalog,
-ident,
-
tableOpt.map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(tableName)),
-changes
-  ))
-case scala.Right(tableOpt) =>
-  tableOpt.map { table =>
-AlterTable(
-  sessionCatalog.asTableCatalog,
-  Identifier.of(tableName.init.toArray, tableName.last),
-  DataSourceV2Relation.create(table),
-  changes
-)
-  }
+  lookupV2RelationAndCatalog(tableName).map {
+case (relation, catalog, ident) =>
+  AlterTable(catalog.asTableCatalog, ident, relation, changes)
   }
 }
   }
+
   /**
* Resolve DESCRIBE TABLE statements that use a DSv2 catalog.
*
@@ -2840,36 +2820,35 @@ class Analyzer(
 
   /**
* Performs the lookup of DataSourceV2 Tables. The order of resolution is:
-   *   1. Check if this relation is a temporary table
-   *   2. Check if it has a catalog identifier. Here we try to load the table. 
If we find the table,
-   *  we can return the table. The result returned by an explicit catalog 
will be returned on
-   *  the Left projection of the Either.
-   *   3. Try resolving the relation using the V2SessionCatalog if that is 
defined. If the
-   *  V2SessionCatalog returns a V1 table definitio

[spark] branch master updated: [SPARK-28628][SQL] Implement SupportsNamespaces in V2SessionCatalog

2019-09-03 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5ea134c  [SPARK-28628][SQL] Implement SupportsNamespaces in 
V2SessionCatalog
5ea134c is described below

commit 5ea134c3546aa0512a85cc2970d38f5e0345edde
Author: Ryan Blue 
AuthorDate: Tue Sep 3 13:13:27 2019 -0700

[SPARK-28628][SQL] Implement SupportsNamespaces in V2SessionCatalog

## What changes were proposed in this pull request?

This adds namespace support to V2SessionCatalog.

## How was this patch tested?

WIP: will add tests for v2 session catalog namespace methods.

Closes #25363 from 
rdblue/SPARK-28628-support-namespaces-in-v2-session-catalog.

Authored-by: Ryan Blue 
Signed-off-by: Burak Yavuz 
---
 .../datasources/v2/V2SessionCatalog.scala  | 136 +++-
 .../datasources/v2/V2SessionCatalogSuite.scala | 347 +++--
 2 files changed, 451 insertions(+), 32 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 6dcebe295..6f8cf47 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -17,18 +17,20 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.net.URI
 import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
-import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, 
FieldReference, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, 
SupportsNamespaces, TableCatalog, TableChange}
+import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, 
SetProperty}
+import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, 
FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTableType, CatalogUtils, SessionCatalog}
+import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.sources.v2.Table
@@ -39,11 +41,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 /**
  * A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
  */
-class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
+class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with 
SupportsNamespaces {
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+  import V2SessionCatalog._
+
   def this() = {
 this(SparkSession.active.sessionState)
   }
 
+  override val defaultNamespace: Array[String] = Array("default")
+
   private lazy val catalog: SessionCatalog = sessionState.catalog
 
   private var _name: String = _
@@ -87,7 +94,7 @@ class V2SessionCatalog(sessionState: SessionState) extends 
TableCatalog {
 val (partitionColumns, maybeBucketSpec) = 
V2SessionCatalog.convertTransforms(partitions)
 val provider = properties.getOrDefault("provider", 
sessionState.conf.defaultDataSourceName)
 val tableProperties = properties.asScala
-val location = Option(properties.get("location"))
+val location = Option(properties.get(LOCATION_TABLE_PROP))
 val storage = 
DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
 .copy(locationUri = location.map(CatalogUtils.stringToURI))
 val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else 
CatalogTableType.MANAGED
@@ -102,7 +109,7 @@ class V2SessionCatalog(sessionState: SessionState) extends 
TableCatalog {
   bucketSpec = maybeBucketSpec,
   properties = tableProperties.toMap,
   tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions,
-  comment = Option(properties.get("comment")))
+  comment = Option(properties.get(COMMENT_TABLE_PROP)))
 
 try {
   catalog.createTable(tableDesc, ig

[spark] branch master updated: [SPARK-28612][SQL] Add DataFrameWriterV2 API

2019-08-31 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3821d75  [SPARK-28612][SQL] Add DataFrameWriterV2 API
3821d75 is described below

commit 3821d75b836afae55a2a92c14b379bf4ec8a5362
Author: Ryan Blue 
AuthorDate: Sat Aug 31 21:28:20 2019 -0700

[SPARK-28612][SQL] Add DataFrameWriterV2 API

## What changes were proposed in this pull request?

This adds a new write API as proposed in the [SPIP to standardize logical 
plans](https://issues.apache.org/jira/browse/SPARK-23521). This new API:

* Uses clear verbs to execute writes, like `append`, `overwrite`, `create`, 
and `replace` that correspond to the new logical plans.
* Only creates v2 logical plans so the behavior is always consistent.
* Does not allow table configuration options for operations that cannot 
change table configuration. For example, `partitionedBy` can only be called 
when the writer executes `create` or `replace`.

Here are a few example uses of the new API:

```scala
df.writeTo("catalog.db.table").append()
df.writeTo("catalog.db.table").overwrite($"date" === "2019-06-01")
df.writeTo("catalog.db.table").overwritePartitions()
df.writeTo("catalog.db.table").asParquet.create()
df.writeTo("catalog.db.table").partitionedBy(days($"ts")).createOrReplace()
df.writeTo("catalog.db.table").using("abc").replace()
```

## How was this patch tested?

Added `DataFrameWriterV2Suite` that tests the new write API. Existing tests 
for v2 plans.

Closes #25354 from rdblue/SPARK-28612-add-data-frame-writer-v2.

Authored-by: Ryan Blue 
Signed-off-by: Burak Yavuz 
---
 .../catalyst/expressions/PartitionTransforms.scala |  77 
 .../spark/sql/catalyst/analysis/Analyzer.scala |   6 +-
 .../plans/logical/basicLogicalOperators.scala  |  47 +-
 .../datasources/v2/DataSourceV2Implicits.scala |   9 +
 .../apache/spark/sql/connector/InMemoryTable.scala |   5 +-
 .../org/apache/spark/sql/DataFrameWriter.scala |  11 +-
 .../org/apache/spark/sql/DataFrameWriterV2.scala   | 365 +++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  28 ++
 .../datasources/v2/DataSourceV2Strategy.scala  |  20 +-
 .../datasources/v2/V2WriteSupportCheck.scala   |   6 +-
 .../scala/org/apache/spark/sql/functions.scala |  64 +++
 .../sql/sources/v2/DataFrameWriterV2Suite.scala| 508 +
 12 files changed, 1110 insertions(+), 36 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
new file mode 100644
index 000..e48fd8a
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+/**
+ * Base class for expressions that are converted to v2 partition transforms.
+ *
+ * Subclasses represent abstract transform functions with concrete 
implementations that are
+ * determined by data source implementations. Because the concrete 
implementation is not known,
+ * these expressions are [[Unevaluable]].
+ *
+ * These expressions are used to pass transformations from the DataFrame API:
+ *
+ * {{{
+ *   df.writeTo("catalog.db.table").partitionedBy($"category", 
days($"timestamp")).create()
+ * }}}
+ */
+abstract class PartitionTransformExpression extends Expression with 
Unevaluable {
+  override def nullable: Boolean = true
+}
+
+/**
+ * Expression for the v2 partition transform years.
+ */
+case class Years(child: Expression) extends PartitionTransformExpression {
+  override def dataType: DataType = IntegerType
+  overrid

[spark] branch master updated: [SPARK-28635][SQL][FOLLOWUP] CatalogManager should reflect the changes of default catalog

2019-08-21 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 97b046f  [SPARK-28635][SQL][FOLLOWUP] CatalogManager should reflect 
the changes of default catalog
97b046f is described below

commit 97b046f06fba6a5c555310ac41f46f5fdfbdc5d4
Author: Wenchen Fan 
AuthorDate: Wed Aug 21 12:23:42 2019 -0700

[SPARK-28635][SQL][FOLLOWUP] CatalogManager should reflect the changes of 
default catalog



### What changes were proposed in this pull request?

The current namespace/catalog should be set to None at the beginning, so 
that we can read the new configs when reporting currennt namespace/catalog 
later.

### Why are the changes needed?

Fix a bug in CatalogManager, to reflect the change of default catalog 
config when reporting current catalog.

### Does this PR introduce any user-facing change?

No. The current namespace/catalog stuff is still internal right now.

### How was this patch tested?

a new test suite

Closes #25521 from cloud-fan/fix.

Authored-by: Wenchen Fan 
Signed-off-by: Burak Yavuz 
---
 .../spark/sql/catalog/v2/CatalogManager.scala  | 23 +++---
 .../sql/catalyst/catalog/CatalogManagerSuite.scala | 94 ++
 2 files changed, 106 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala
index c91a73a..d5a6a61 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala
@@ -62,36 +62,37 @@ class CatalogManager(conf: SQLConf) extends Logging {
 case _ => Array.empty[String]
   }
 
-  private var _currentNamespace = {
-// The builtin catalog use "default" as the default database.
-defaultCatalog.map(getDefaultNamespace).getOrElse(Array("default"))
-  }
+  private var _currentNamespace: Option[Array[String]] = None
 
   def currentNamespace: Array[String] = synchronized {
-_currentNamespace
+_currentNamespace.getOrElse {
+  currentCatalog.map { catalogName =>
+getDefaultNamespace(catalog(catalogName))
+  }.getOrElse(Array("default")) // The builtin catalog use "default" as 
the default database.
+}
   }
 
   def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
-_currentNamespace = namespace
+_currentNamespace = Some(namespace)
   }
 
-  private var _currentCatalog = conf.defaultV2Catalog
+  private var _currentCatalog: Option[String] = None
 
   // Returns the name of current catalog. None means the current catalog is 
the builtin catalog.
   def currentCatalog: Option[String] = synchronized {
-_currentCatalog
+_currentCatalog.orElse(conf.defaultV2Catalog)
   }
 
   def setCurrentCatalog(catalogName: String): Unit = synchronized {
 _currentCatalog = Some(catalogName)
-_currentNamespace = getDefaultNamespace(catalog(catalogName))
+_currentNamespace = None
   }
 
   // Clear all the registered catalogs. Only used in tests.
   private[sql] def reset(): Unit = synchronized {
 catalogs.clear()
-_currentNamespace = 
defaultCatalog.map(getDefaultNamespace).getOrElse(Array("default"))
-_currentCatalog = conf.defaultV2Catalog
+_currentNamespace = None
+_currentCatalog = None
   }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala
new file mode 100644
index 000..f7f1901
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import java.util
+
+import org.apache

[spark] branch master updated: [SPARK-28565][SQL] DataFrameWriter saveAsTable support for V2 catalogs

2019-08-08 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5368eaa  [SPARK-28565][SQL] DataFrameWriter saveAsTable support for V2 
catalogs
5368eaa is described below

commit 5368eaa2fc33f6d50b482bd1e90e0437b1887cd2
Author: Burak Yavuz 
AuthorDate: Thu Aug 8 22:30:00 2019 -0700

[SPARK-28565][SQL] DataFrameWriter saveAsTable support for V2 catalogs

## What changes were proposed in this pull request?

Adds support for V2 catalogs and the V2SessionCatalog for V2 tables for 
saveAsTable.
If the table can resolve through the V2SessionCatalog, we use SaveMode for 
datasource v1 for backwards compatibility to select the code path we're going 
to hit.

Depending on the SaveMode:
 - SaveMode.Append:
 a) If table exists: Use AppendData.byName
 b) If table doesn't exist, use CTAS (ignoreIfExists = false)
 - SaveMode.Overwrite: Use RTAS (orCreate = true)
 - SaveMode.Ignore: Use CTAS (ignoreIfExists = true)
 - SaveMode.ErrorIfExists: Use CTAS (ignoreIfExists = false)

## How was this patch tested?

Unit tests in DataSourceV2DataFrameSuite

Closes #25330 from brkyvz/saveAsTable.

Lead-authored-by: Burak Yavuz 
Co-authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 
---
 .../org/apache/spark/sql/DataFrameWriter.scala | 81 --
 .../sources/v2/DataSourceV2DataFrameSuite.scala| 64 -
 2 files changed, 138 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ae82670..af7ddd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,22 +22,22 @@ import java.util.{Locale, Properties, UUID}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Stable
-import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier}
+import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, 
TableCatalog}
+import org.apache.spark.sql.catalog.v2.expressions._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NoSuchTableException, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, ReplaceTableAsSelect}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
DataSourceUtils, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
-import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.TableCapability._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{IntegerType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
@@ -360,6 +360,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
*/
   def insertInto(tableName: String): Unit = {
 import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, 
CatalogObjectIdentifier}
+import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
 
 assertNotBucketed("insertInto")
 
@@ -374,8 +375,12 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) 
match {
   case CatalogObjectIdentifier(Some(catalog), ident) =>
 insertInto(catalog, ident)
+  // TODO(SPARK-28667): Support the V2SessionCatalog
   case AsTableIdentifier(tableIdentifier) =>
 insertInto(tableIdentifier)
+  case other =>
+throw new AnalysisException(
+  s"Couldn't find a catalog to handle the identifier ${other.quoted}.")
 }
   }
 
@@ -485,7 +490,71 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 1.4.0
*/
   def saveAsTable(tableName: String): Unit = {
-
saveAsTable(df.sparkSession.sessionState.sqlParser.parseTableIdent

[spark] branch master updated: [SPARK-28331][SQL] Catalogs.load() should be able to load built-in catalogs

2019-08-07 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c88df2c  [SPARK-28331][SQL] Catalogs.load() should be able to load 
built-in catalogs
c88df2c is described below

commit c88df2ccf670db62aed6565c9dbdb58d5d5cca3f
Author: Gengliang Wang 
AuthorDate: Wed Aug 7 16:14:34 2019 -0700

[SPARK-28331][SQL] Catalogs.load() should be able to load built-in catalogs

## What changes were proposed in this pull request?

In `Catalogs.load`, the `pluginClassName` in the following code
```
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, 
null);
```
is always null for built-in catalogs, e.g there is a SQLConf entry 
`spark.sql.catalog.session`.

This is because of https://github.com/apache/spark/pull/18852: 
SQLConf.conf.getConfString(key, null) always returns null.

## How was this patch tested?

Apply code changes of https://github.com/apache/spark/pull/24768 and tried 
loading session catalog.

Closes #25094 from gengliangwang/fixCatalogLoad.

Authored-by: Gengliang Wang 
Signed-off-by: Burak Yavuz 
---
 .../src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java| 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
index 7511d94..f471a4e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
@@ -26,6 +26,7 @@ import org.apache.spark.util.Utils;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -50,8 +51,10 @@ public class Catalogs {
*/
   public static CatalogPlugin load(String name, SQLConf conf)
   throws CatalogNotFoundException, SparkException {
-String pluginClassName = conf.getConfString("spark.sql.catalog." + name, 
null);
-if (pluginClassName == null) {
+String pluginClassName;
+try {
+  pluginClassName = conf.getConfString("spark.sql.catalog." + name);
+} catch (NoSuchElementException e){
   throw new CatalogNotFoundException(String.format(
   "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not 
defined", name, name));
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27661][SQL] Add SupportsNamespaces API

2019-08-04 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0345f11  [SPARK-27661][SQL] Add SupportsNamespaces API
0345f11 is described below

commit 0345f1174d6019374ed5451140e01c224508bc0e
Author: Ryan Blue 
AuthorDate: Sun Aug 4 21:29:40 2019 -0700

[SPARK-27661][SQL] Add SupportsNamespaces API

## What changes were proposed in this pull request?

This adds an interface for catalog plugins that exposes namespace 
operations:
* `listNamespaces`
* `namespaceExists`
* `loadNamespaceMetadata`
* `createNamespace`
* `alterNamespace`
* `dropNamespace`

## How was this patch tested?

API only. Existing tests for regressions.

Closes #24560 from rdblue/SPARK-27661-add-catalog-namespace-api.

Authored-by: Ryan Blue 
Signed-off-by: Burak Yavuz 
---
 .../spark/sql/catalog/v2/NamespaceChange.java  |  97 ++
 .../spark/sql/catalog/v2/SupportsNamespaces.java   | 145 +++
 .../spark/sql/catalog/v2/utils/CatalogV2Util.scala |  33 +++-
 .../spark/sql/catalog/v2/TableCatalogSuite.scala   | 204 -
 .../spark/sql/catalog/v2/TestTableCatalog.scala|  70 ++-
 5 files changed, 543 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
new file mode 100644
index 000..6f5895b
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+/**
+ * NamespaceChange subclasses represent requested changes to a namespace. 
These are passed to
+ * {@link SupportsNamespaces#alterNamespace}. For example,
+ * 
+ *   import NamespaceChange._
+ *   val catalog = Catalogs.load(name)
+ *   catalog.alterNamespace(ident,
+ *   setProperty("prop", "value"),
+ *   removeProperty("other_prop")
+ * )
+ * 
+ */
+public interface NamespaceChange {
+  /**
+   * Create a NamespaceChange for setting a namespace property.
+   * 
+   * If the property already exists, it will be replaced with the new value.
+   *
+   * @param property the property name
+   * @param value the new property value
+   * @return a NamespaceChange for the addition
+   */
+  static NamespaceChange setProperty(String property, String value) {
+return new SetProperty(property, value);
+  }
+
+  /**
+   * Create a NamespaceChange for removing a namespace property.
+   * 
+   * If the property does not exist, the change will succeed.
+   *
+   * @param property the property name
+   * @return a NamespaceChange for the addition
+   */
+  static NamespaceChange removeProperty(String property) {
+return new RemoveProperty(property);
+  }
+
+  /**
+   * A NamespaceChange to set a namespace property.
+   * 
+   * If the property already exists, it must be replaced with the new value.
+   */
+  final class SetProperty implements NamespaceChange {
+private final String property;
+private final String value;
+
+private SetProperty(String property, String value) {
+  this.property = property;
+  this.value = value;
+}
+
+public String property() {
+  return property;
+}
+
+public String value() {
+  return value;
+}
+  }
+
+  /**
+   * A NamespaceChange to remove a namespace property.
+   * 
+   * If the property does not exist, the change should succeed.
+   */
+  final class RemoveProperty implements NamespaceChange {
+private final String property;
+
+private RemoveProperty(String property) {
+  this.property = property;
+}
+
+public String property() {
+  return property;
+}
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
new file mode 

[spark] branch master updated: [SPARK-27661][SQL] Add SupportsNamespaces API

2019-08-04 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0f89a5d  [SPARK-27661][SQL] Add SupportsNamespaces API
0f89a5d is described below

commit 0f89a5dcca86c9d5eff9376855115e6d8838254c
Author: Ryan Blue 
AuthorDate: Sun Aug 4 21:23:42 2019 -0700

[SPARK-27661][SQL] Add SupportsNamespaces API

## What changes were proposed in this pull request?

This adds an interface for catalog plugins that exposes namespace 
operations:
* `listNamespaces`
* `namespaceExists`
* `loadNamespaceMetadata`
* `createNamespace`
* `alterNamespace`
* `dropNamespace`

## How was this patch tested?

API only. Existing tests for regressions.

Closes #24560 from rdblue/SPARK-27661-add-catalog-namespace-api.

Authored-by: Ryan Blue 
Signed-off-by: Burak Yavuz 
---
 .../spark/sql/catalog/v2/NamespaceChange.java  |  97 ++
 .../spark/sql/catalog/v2/SupportsNamespaces.java   | 145 +++
 .../spark/sql/catalog/v2/utils/CatalogV2Util.scala |  33 +++-
 .../spark/sql/catalog/v2/TableCatalogSuite.scala   | 204 -
 .../spark/sql/catalog/v2/TestTableCatalog.scala|  70 ++-
 5 files changed, 543 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
new file mode 100644
index 000..6f5895b
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+/**
+ * NamespaceChange subclasses represent requested changes to a namespace. 
These are passed to
+ * {@link SupportsNamespaces#alterNamespace}. For example,
+ * 
+ *   import NamespaceChange._
+ *   val catalog = Catalogs.load(name)
+ *   catalog.alterNamespace(ident,
+ *   setProperty("prop", "value"),
+ *   removeProperty("other_prop")
+ * )
+ * 
+ */
+public interface NamespaceChange {
+  /**
+   * Create a NamespaceChange for setting a namespace property.
+   * 
+   * If the property already exists, it will be replaced with the new value.
+   *
+   * @param property the property name
+   * @param value the new property value
+   * @return a NamespaceChange for the addition
+   */
+  static NamespaceChange setProperty(String property, String value) {
+return new SetProperty(property, value);
+  }
+
+  /**
+   * Create a NamespaceChange for removing a namespace property.
+   * 
+   * If the property does not exist, the change will succeed.
+   *
+   * @param property the property name
+   * @return a NamespaceChange for the addition
+   */
+  static NamespaceChange removeProperty(String property) {
+return new RemoveProperty(property);
+  }
+
+  /**
+   * A NamespaceChange to set a namespace property.
+   * 
+   * If the property already exists, it must be replaced with the new value.
+   */
+  final class SetProperty implements NamespaceChange {
+private final String property;
+private final String value;
+
+private SetProperty(String property, String value) {
+  this.property = property;
+  this.value = value;
+}
+
+public String property() {
+  return property;
+}
+
+public String value() {
+  return value;
+}
+  }
+
+  /**
+   * A NamespaceChange to remove a namespace property.
+   * 
+   * If the property does not exist, the change should succeed.
+   */
+  final class RemoveProperty implements NamespaceChange {
+private final String property;
+
+private RemoveProperty(String property) {
+  this.property = property;
+}
+
+public String property() {
+  return property;
+}
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
new file mode 

[spark] branch master updated: [SPARK-27845][SQL] DataSourceV2: InsertTable

2019-07-25 Thread brkyvz
This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 443904a  [SPARK-27845][SQL] DataSourceV2: InsertTable
443904a is described below

commit 443904a14044ff32421e577dc26d0d53112ceaba
Author: Ryan Blue 
AuthorDate: Thu Jul 25 15:05:51 2019 -0700

[SPARK-27845][SQL] DataSourceV2: InsertTable

## What changes were proposed in this pull request?

Support multiple catalogs in the following InsertTable use cases:

- INSERT INTO [TABLE] catalog.db.tbl
- INSERT OVERWRITE TABLE catalog.db.tbl

Support matrix:

Overwrite|Partitioned Table|Partition Clause |Partition Overwrite 
Mode|Action
-|-|-||-
false|*|*|*|AppendData
true|no|(empty)|*|OverwriteByExpression(true)
true|yes|p1,p2 or p1 or p2 or (empty)|STATIC|OverwriteByExpression(true)
true|yes|p2,p2 or p1 or p2 or (empty)|DYNAMIC|OverwritePartitionsDynamic
true|yes|p1=23,p2=3|*|OverwriteByExpression(p1=23 and p2=3)
true|yes|p1=23,p2 or p1=23|STATIC|OverwriteByExpression(p1=23)
true|yes|p1=23,p2 or p1=23|DYNAMIC|OverwritePartitionsDynamic

Notes:
- Assume the partitioned table has 2 partitions: p1 and p2.
- `STATIC` is the default Partition Overwrite Mode for data source tables.
- DSv2 tables currently do not support `IfPartitionNotExists`.

## How was this patch tested?

New tests.
All existing catalyst and sql/core tests.

Closes #24832 from jzhuge/SPARK-27845-pr.

Lead-authored-by: Ryan Blue 
Co-authored-by: John Zhuge 
Signed-off-by: Burak Yavuz 
---
 .../apache/spark/sql/catalyst/parser/SqlBase.g4|   4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala | 137 +-
 .../apache/spark/sql/catalyst/dsl/package.scala|  13 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala |  36 ++-
 .../plans/logical/sql/InsertIntoStatement.scala|  50 
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 113 +++-
 .../sql/catalyst/parser/PlanParserSuite.scala  |  19 +-
 .../datasources/DataSourceResolution.scala |   1 +
 .../sql/sources/v2/DataSourceV2SQLSuite.scala  | 301 -
 .../sql/sources/v2/TestInMemoryTableCatalog.scala  | 137 +++---
 .../org/apache/spark/sql/hive/InsertSuite.scala|   3 +-
 11 files changed, 738 insertions(+), 76 deletions(-)

diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 0a142c2..517ef9d 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -294,8 +294,8 @@ query
 ;
 
 insertInto
-: INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? 
 #insertOverwriteTable
-| INSERT INTO TABLE? tableIdentifier partitionSpec?
 #insertIntoTable
+: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT 
EXISTS)?)? #insertOverwriteTable
+| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)?   
 #insertIntoTable
 | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? 
createFileFormat?#insertOverwriteHiveDir
 | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS 
options=tablePropertyList)?   #insertOverwriteDir
 ;
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e55cdfe..021fb26 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -25,6 +25,8 @@ import scala.util.Random
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, 
CatalogPlugin, LookupCatalog, TableChange}
+import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, 
IdentityTransform}
+import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util.loadTable
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -34,12 +36,14 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.objects._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import 
org.apache.spark.sql.catalyst.plans.logical.sql

spark git commit: [SPARK-25472][SS] Don't have legitimate stops of streams cause stream exceptions

2018-09-20 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 4d114fc9a -> 77e52448e


[SPARK-25472][SS] Don't have legitimate stops of streams cause stream exceptions

## What changes were proposed in this pull request?

Legitimate stops of streams may actually cause an exception to be captured by 
stream execution, because the job throws a SparkException regarding job 
cancellation during a stop. This PR makes the stop more graceful by swallowing 
this cancellation error.

## How was this patch tested?

This is pretty hard to test. The existing tests should make sure that we're not 
swallowing other specific SparkExceptions. I've also run the 
`KafkaSourceStressForDontFailOnDataLossSuite`100 times, and it didn't fail, 
whereas it used to be flaky.

Closes #22478 from brkyvz/SPARK-25472.

Authored-by: Burak Yavuz 
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77e52448
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77e52448
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77e52448

Branch: refs/heads/master
Commit: 77e52448e7f94aadfa852cc67084415de6ecfa7c
Parents: 4d114fc
Author: Burak Yavuz 
Authored: Thu Sep 20 15:46:33 2018 -0700
Committer: Burak Yavuz 
Committed: Thu Sep 20 15:46:33 2018 -0700

--
 .../execution/streaming/StreamExecution.scala   | 22 +++-
 .../continuous/ContinuousExecution.scala|  4 ++--
 .../WriteToContinuousDataSourceExec.scala   |  2 +-
 3 files changed, 20 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/77e52448/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f6c60c1..631a6eb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
 import com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -282,7 +283,7 @@ abstract class StreamExecution(
 // `stop()` is already called. Let `finally` finish the cleanup.
   }
 } catch {
-  case e if isInterruptedByStop(e) =>
+  case e if isInterruptedByStop(e, sparkSession.sparkContext) =>
 // interrupted by stop()
 updateStatusMessage("Stopped")
   case e: IOException if e.getMessage != null
@@ -354,9 +355,9 @@ abstract class StreamExecution(
 }
   }
 
-  private def isInterruptedByStop(e: Throwable): Boolean = {
+  private def isInterruptedByStop(e: Throwable, sc: SparkContext): Boolean = {
 if (state.get == TERMINATED) {
-  StreamExecution.isInterruptionException(e)
+  StreamExecution.isInterruptionException(e, sc)
 } else {
   false
 }
@@ -531,7 +532,7 @@ object StreamExecution {
   val QUERY_ID_KEY = "sql.streaming.queryId"
   val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
 
-  def isInterruptionException(e: Throwable): Boolean = e match {
+  def isInterruptionException(e: Throwable, sc: SparkContext): Boolean = e 
match {
 // InterruptedIOException - thrown when an I/O operation is interrupted
 // ClosedByInterruptException - thrown when an I/O operation upon a 
channel is interrupted
 case _: InterruptedException | _: InterruptedIOException | _: 
ClosedByInterruptException =>
@@ -546,7 +547,18 @@ object StreamExecution {
 //   ExecutionException, such as 
BiFunction.apply
 case e2 @ (_: UncheckedIOException | _: ExecutionException | _: 
UncheckedExecutionException)
 if e2.getCause != null =>
-  isInterruptionException(e2.getCause)
+  isInterruptionException(e2.getCause, sc)
+case se: SparkException =>
+  val jobGroup = sc.getLocalProperty("spark.jobGroup.id")
+  if (jobGroup == null) return false
+  val errorMsg = se.getMessage
+  if (errorMsg.contains("cancelled") && errorMsg.contains(jobGroup) && 
se.getCause == null) {
+true
+  } else if (se.getCause != null) {
+isInterruptionException(se.getCause, sc)
+  } else {
+false
+  }
 case _ =>
   false
   }

http://git-wip-us.apache.org/repos/asf/spark/bl

spark git commit: [SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink

2018-06-15 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 90da7dc24 -> e4fee395e


[SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink

## What changes were proposed in this pull request?

Provide an option to limit number of rows in a MemorySink. Currently, 
MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used 
on big data, they can OOM the stream. This change adds a maxMemorySinkRows 
option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, 
they are still unbounded.

## How was this patch tested?

Added new unit tests.

Author: Mukul Murthy 

Closes #21559 from mukulmurthy/SPARK-24525.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4fee395
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4fee395
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4fee395

Branch: refs/heads/master
Commit: e4fee395ecd93ad4579d9afbf0861f82a303e563
Parents: 90da7dc
Author: Mukul Murthy 
Authored: Fri Jun 15 13:56:48 2018 -0700
Committer: Burak Yavuz 
Committed: Fri Jun 15 13:56:48 2018 -0700

--
 .../spark/sql/execution/streaming/memory.scala  | 70 +++--
 .../execution/streaming/sources/memoryV2.scala  | 44 ---
 .../spark/sql/streaming/DataStreamWriter.scala  |  4 +-
 .../execution/streaming/MemorySinkSuite.scala   | 62 ++-
 .../execution/streaming/MemorySinkV2Suite.scala | 80 +++-
 .../apache/spark/sql/streaming/StreamTest.scala |  4 +-
 6 files changed, 239 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e4fee395/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index b137f98..7fa13c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsScanUnsafeRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset => OffsetV2}
 import org.apache.spark.sql.streaming.OutputMode
@@ -221,19 +222,60 @@ class MemoryStreamInputPartition(records: 
Array[UnsafeRow])
 }
 
 /** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
   def allData: Seq[Row]
   def latestBatchData: Seq[Row]
   def dataSinceBatch(sinceBatchId: Long): Seq[Row]
   def latestBatchId: Option[Long]
+
+  /**
+   * Truncates the given rows to return at most maxRows rows.
+   * @param rows The data that may need to be truncated.
+   * @param batchLimit Number of rows to keep in this batch; the rest will be 
truncated
+   * @param sinkLimit Total number of rows kept in this sink, for logging 
purposes.
+   * @param batchId The ID of the batch that sent these rows, for logging 
purposes.
+   * @return Truncated rows.
+   */
+  protected def truncateRowsIfNeeded(
+  rows: Array[Row],
+  batchLimit: Int,
+  sinkLimit: Int,
+  batchId: Long): Array[Row] = {
+if (rows.length > batchLimit && batchLimit >= 0) {
+  logWarning(s"Truncating batch $batchId to $batchLimit rows because of 
sink limit $sinkLimit")
+  rows.take(batchLimit)
+} else {
+  rows
+}
+  }
+}
+
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+  val MAX_MEMORY_SINK_ROWS = "maxRows"
+  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+  /**
+   * Gets the max number of rows a MemorySink should store. This number is 
based on the memory
+   * sink row limit option if it is set. If not, we use a large value so that 
data truncates
+   * rather than causing out of memory errors.
+   * @param options Options for writing from which we get the max rows option
+   * @return The maximum number of rows a memorySink should store.
+   */
+  def getMemorySinkCapacity(options: DataSourceOptions): Int = {
+val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, 
MAX_MEMORY_SINK_ROWS_DEFAULT)
+if (maxRows >= 0) maxRows else Int.MaxValue - 10
+  }
 }
 
 /**
  * A sink that 

spark git commit: [SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific timestamp

2017-12-25 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master be03d3ad7 -> 0e6833006


[SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific 
timestamp

## What changes were proposed in this pull request?

Kinesis client can resume from a specified timestamp while creating a stream. 
We should have option to pass a timestamp in config to allow kinesis to resume 
from the given timestamp.

The patch introduces a new `KinesisInitialPositionInStream` that takes the 
`InitialPositionInStream` with the `timestamp` information that can be used to 
resume kinesis fetches from the provided timestamp.

## How was this patch tested?

Unit Tests

cc : budde brkyvz

Author: Yash Sharma <ysha...@atlassian.com>

Closes #18029 from yssharma/ysharma/kcl_resume.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e683300
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e683300
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e683300

Branch: refs/heads/master
Commit: 0e6833006d28df426eb132bb8fc82917b8e2aedd
Parents: be03d3a
Author: Yash Sharma <ysha...@atlassian.com>
Authored: Tue Dec 26 09:50:39 2017 +0200
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Tue Dec 26 09:50:39 2017 +0200

--
 .../kinesis/KinesisInitialPositions.java| 91 
 .../streaming/KinesisWordCountASL.scala |  5 +-
 .../streaming/kinesis/KinesisInputDStream.scala | 31 +--
 .../streaming/kinesis/KinesisReceiver.scala | 45 ++
 .../spark/streaming/kinesis/KinesisUtils.scala  | 15 ++--
 .../JavaKinesisInputDStreamBuilderSuite.java| 47 --
 .../KinesisInputDStreamBuilderSuite.scala   | 68 +--
 .../streaming/kinesis/KinesisStreamSuite.scala  | 11 +--
 8 files changed, 264 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
--
diff --git 
a/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
 
b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
new file mode 100644
index 000..206e1e4
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * A java wrapper for exposing [[InitialPositionInStream]]
+ * to the corresponding Kinesis readers.
+ */
+interface KinesisInitialPosition {
+InitialPositionInStream getPosition();
+}
+
+public class KinesisInitialPositions {
+public static class Latest implements KinesisInitialPosition, Serializable 
{
+public Latest() {}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.LATEST;
+}
+}
+
+public static class TrimHorizon implements KinesisInitialPosition, 
Serializable {
+public TrimHorizon() {}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.TRIM_HORIZON;
+}
+}
+
+public static class AtTimestamp implements KinesisInitialPosition, 
Serializable {
+private Date timestamp;
+
+public AtTimestamp(Date timestamp) {
+this.timestamp = timestamp;
+}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.AT_TIMESTAMP;
+}
+
+public Date getTimestamp() {
+return timestamp;
+}
+}
+
+
+/**
+ * Returns instance of [[KinesisInitialPosition]] based

spark git commit: [SPARK-21977] SinglePartition optimizations break certain Streaming Stateful Aggregation requirements

2017-09-20 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master c6ff59a23 -> 280ff523f


[SPARK-21977] SinglePartition optimizations break certain Streaming Stateful 
Aggregation requirements

## What changes were proposed in this pull request?

This is a bit hard to explain as there are several issues here, I'll try my 
best. Here are the requirements:
  1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions
  2. A StructuredStreaming query that uses the above source, performs a 
stateful aggregation
 (mapGroupsWithState, groupBy.count, ...), and coalesce's by 1

The crux of the problem is that when a dataset has a `coalesce(1)` call, it 
receives a `SinglePartition` partitioning scheme. This scheme satisfies most 
required distributions used for aggregations such as HashAggregateExec. This 
causes a world of problems:
  Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 
0 partitions, nothing will be executed, the state store will not create any 
delta files. When this happens, the next trigger fails, because the StateStore 
fails to load the delta file for the previous trigger
  Symptom 2. Let's say that there was data. Then in this case, if you stop your 
stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, 
your stream will fail, because `spark.sql.shuffle.partitions - 1` number of 
StateStores will fail to find its delta files.

To fix the issues above, we must check that the partitioning of the child of a 
`StatefulOperator` satisfies:
If the grouping expressions are empty:
  a) AllTuple distribution
  b) Single physical partition
If the grouping expressions are non empty:
  a) Clustered distribution
  b) spark.sql.shuffle.partition # of partitions
whether or not `coalesce(1)` exists in the plan, and whether or not the input 
RDD for the trigger has any data.

Once you fix the above problem by adding an Exchange to the plan, you come 
across the following bug:
If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if 
you have a trigger with no data, `StateStoreRestoreExec` doesn't return the 
prior state. However, for this specific aggregation, `HashAggregateExec` after 
the restore returns a (0, 0) row, since we're performing a count, and there is 
no data. Then this data gets stored in `StateStoreSaveExec` causing the 
previous counts to be overwritten and lost.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brk...@gmail.com>

Closes #19196 from brkyvz/sa-0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/280ff523
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/280ff523
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/280ff523

Branch: refs/heads/master
Commit: 280ff523f4079dd9541efc95e6efcb69f9374d22
Parents: c6ff59a
Author: Burak Yavuz <brk...@gmail.com>
Authored: Wed Sep 20 00:01:21 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Wed Sep 20 00:01:21 2017 -0700

--
 .../streaming/IncrementalExecution.scala|  34 +++-
 .../execution/streaming/StreamExecution.scala   |   1 +
 .../execution/streaming/statefulOperators.scala |  37 +++-
 .../EnsureStatefulOpPartitioningSuite.scala | 132 +
 .../apache/spark/sql/streaming/StreamTest.scala |  16 +-
 .../streaming/StreamingAggregationSuite.scala   | 196 ++-
 6 files changed, 395 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/280ff523/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 19d9598..027222e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -21,11 +21,13 @@ import java.util.UUID
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, HashPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, 
SparkPlanner, UnaryExecNode}
+import org.apache.spark.sql.execution.exc

spark git commit: [SPARK-21463] Allow userSpecifiedSchema to override partition inference performed by MetadataLogFileIndex

2017-07-19 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 8cd9cdf17 -> 2c9d5ef1f


[SPARK-21463] Allow userSpecifiedSchema to override partition inference 
performed by MetadataLogFileIndex

## What changes were proposed in this pull request?

When using the MetadataLogFileIndex to read back a table, we don't respect the 
user provided schema as the proper column types. This can lead to issues when 
trying to read strings that look like dates that get truncated to DateType, or 
longs being truncated to IntegerType, just because a long value doesn't exist.

## How was this patch tested?

Unit tests and manual tests

Author: Burak Yavuz <brk...@gmail.com>

Closes #18676 from brkyvz/stream-partitioning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c9d5ef1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c9d5ef1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c9d5ef1

Branch: refs/heads/master
Commit: 2c9d5ef1f0c30713dafbf8ef0eb69d5520f7dcaf
Parents: 8cd9cdf
Author: Burak Yavuz <brk...@gmail.com>
Authored: Wed Jul 19 15:56:26 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Wed Jul 19 15:56:26 2017 -0700

--
 .../sql/execution/datasources/DataSource.scala  | 33 +++-
 .../execution/streaming/FileStreamSource.scala  |  2 +-
 .../streaming/MetadataLogFileIndex.scala| 11 +--
 .../ParquetPartitionDiscoverySuite.scala| 33 
 4 files changed, 69 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2c9d5ef1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d36a04f..cbe8ce4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -97,6 +97,24 @@ case class DataSource(
   }
 
   /**
+   * In the read path, only managed tables by Hive provide the partition 
columns properly when
+   * initializing this class. All other file based data sources will try to 
infer the partitioning,
+   * and then cast the inferred types to user specified dataTypes if the 
partition columns exist
+   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs 
like SPARK-18510, or
+   * inconsistent data types as reported in SPARK-21463.
+   * @param fileIndex A FileIndex that will perform partition inference
+   * @return The PartitionSchema resolved from inference and cast according to 
`userSpecifiedSchema`
+   */
+  private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: 
FileIndex): StructType = {
+val resolved = fileIndex.partitionSchema.map { partitionField =>
+  // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise 
fallback to inferred
+  userSpecifiedSchema.flatMap(_.find(f => equality(f.name, 
partitionField.name))).getOrElse(
+partitionField)
+}
+StructType(resolved)
+  }
+
+  /**
* Get the schema of the given FileFormat, if provided by 
`userSpecifiedSchema`, or try to infer
* it. In the read path, only managed tables by Hive provide the partition 
columns properly when
* initializing this class. All other file based data sources will try to 
infer the partitioning,
@@ -139,12 +157,7 @@ case class DataSource(
 val partitionSchema = if (partitionColumns.isEmpty) {
   // Try to infer partitioning, because no DataSource in the read path 
provides the partitioning
   // columns properly unless it is a Hive DataSource
-  val resolved = tempFileIndex.partitionSchema.map { partitionField =>
-// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise 
fallback to inferred
-userSpecifiedSchema.flatMap(_.find(f => equality(f.name, 
partitionField.name))).getOrElse(
-  partitionField)
-  }
-  StructType(resolved)
+  combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)
 } else {
   // maintain old behavior before SPARK-18510. If userSpecifiedSchema is 
empty used inferred
   // partitioning
@@ -336,7 +349,13 @@ case class DataSource(
 caseInsensitiveOptions.get("path").toSeq ++ paths,
 sparkSession.sessionState.newHadoopConf()) =>
 val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ 
paths).head)
-val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
+val tempFileCatalog = new Metadata

spark git commit: [DSTREAM][DOC] Add documentation for kinesis retry configurations

2017-05-18 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 8b0cb3a7b -> 556ad019f


[DSTREAM][DOC] Add documentation for kinesis retry configurations

## What changes were proposed in this pull request?

The changes were merged as part of - https://github.com/apache/spark/pull/17467.
The documentation was missed somewhere in the review iterations. Adding the 
documentation where it belongs.

## How was this patch tested?
Docs. Not tested.

cc budde , brkyvz

Author: Yash Sharma <ysha...@atlassian.com>

Closes #18028 from yssharma/ysharma/kinesis_retry_docs.

(cherry picked from commit 92580bd0eae5dbf739573093cca1b12fd0c14049)
Signed-off-by: Burak Yavuz <brk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/556ad019
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/556ad019
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/556ad019

Branch: refs/heads/branch-2.2
Commit: 556ad019fa49deb40ba8da3aa6067484ab3d6331
Parents: 8b0cb3a
Author: Yash Sharma <ysha...@atlassian.com>
Authored: Thu May 18 11:24:33 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Thu May 18 11:24:44 2017 -0700

--
 docs/streaming-kinesis-integration.md | 4 
 1 file changed, 4 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/556ad019/docs/streaming-kinesis-integration.md
--
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 6be0b54..9709bd3 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it will 
start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data is 
added to the stream while no input DStreams are running (and no checkpoint info 
is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of 
records where the impact is dependent on checkpoint frequency and processing 
idempotency.
+
+ Kinesis retry configuration
+ - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis 
retries as a duration string. When reading from Amazon Kinesis, users may hit 
`ProvisionedThroughputExceededException`'s, when consuming faster than 5 
transactions/second or, exceeding the maximum read rate of 2 MB/second. This 
configuration can be tweaked to increase the sleep between fetches when a fetch 
fails to reduce these exceptions. Default is "100ms".
+ - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for 
Kinesis fetches. This config can also be used to tackle the Kinesis 
`ProvisionedThroughputExceededException`'s in scenarios mentioned above. It can 
be increased to have more number of retries for Kinesis reads. Default is 3.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DSTREAM][DOC] Add documentation for kinesis retry configurations

2017-05-18 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 8fb3d5c6d -> 92580bd0e


[DSTREAM][DOC] Add documentation for kinesis retry configurations

## What changes were proposed in this pull request?

The changes were merged as part of - https://github.com/apache/spark/pull/17467.
The documentation was missed somewhere in the review iterations. Adding the 
documentation where it belongs.

## How was this patch tested?
Docs. Not tested.

cc budde , brkyvz

Author: Yash Sharma <ysha...@atlassian.com>

Closes #18028 from yssharma/ysharma/kinesis_retry_docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92580bd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92580bd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92580bd0

Branch: refs/heads/master
Commit: 92580bd0eae5dbf739573093cca1b12fd0c14049
Parents: 8fb3d5c
Author: Yash Sharma <ysha...@atlassian.com>
Authored: Thu May 18 11:24:33 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Thu May 18 11:24:33 2017 -0700

--
 docs/streaming-kinesis-integration.md | 4 
 1 file changed, 4 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/92580bd0/docs/streaming-kinesis-integration.md
--
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 6be0b54..9709bd3 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it will 
start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data is 
added to the stream while no input DStreams are running (and no checkpoint info 
is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of 
records where the impact is dependent on checkpoint frequency and processing 
idempotency.
+
+ Kinesis retry configuration
+ - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis 
retries as a duration string. When reading from Amazon Kinesis, users may hit 
`ProvisionedThroughputExceededException`'s, when consuming faster than 5 
transactions/second or, exceeding the maximum read rate of 2 MB/second. This 
configuration can be tweaked to increase the sleep between fetches when a fetch 
fails to reduce these exceptions. Default is "100ms".
+ - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for 
Kinesis fetches. This config can also be used to tackle the Kinesis 
`ProvisionedThroughputExceededException`'s in scenarios mentioned above. It can 
be increased to have more number of retries for Kinesis reads. Default is 3.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries

2017-05-16 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 75e5ea294 -> 7076ab40f


[SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries

## What changes were proposed in this pull request?

The pull requests proposes to remove the hardcoded values for Amazon Kinesis - 
MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES.

This change is critical for kinesis checkpoint recovery when the kinesis backed 
rdd is huge.
Following happens in a typical kinesis recovery :
- kinesis throttles large number of requests while recovering
- retries in case of throttling are not able to recover due to the small wait 
period
- kinesis throttles per second, the wait period should be configurable for 
recovery

The patch picks the spark kinesis configs from:
- spark.streaming.kinesis.retry.wait.time
- spark.streaming.kinesis.retry.max.attempts

Jira : https://issues.apache.org/jira/browse/SPARK-20140

## How was this patch tested?

Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the 
modified configurations. Wasn't able to test the patch with actual throttling.

Author: Yash Sharma 

Closes #17467 from yssharma/ysharma/spark-kinesis-retries.

(cherry picked from commit 38f4e8692ce3b6cbcfe0c1aff9b5e662f7a308b7)
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7076ab40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7076ab40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7076ab40

Branch: refs/heads/branch-2.2
Commit: 7076ab40f86fe606cd9b813dad506e921501383e
Parents: 75e5ea2
Author: Yash Sharma 
Authored: Tue May 16 15:08:05 2017 -0700
Committer: Burak Yavuz 
Committed: Tue May 16 15:08:46 2017 -0700

--
 .../kinesis/KinesisBackedBlockRDD.scala | 33 -
 .../streaming/kinesis/KinesisInputDStream.scala |  6 +-
 .../kinesis/KinesisReadConfigurations.scala | 78 
 .../streaming/kinesis/KinesisStreamSuite.scala  | 49 +++-
 4 files changed, 143 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7076ab40/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index f31ebf1..88b2942 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.AWSCredentials
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
@@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag](
 @transient private val _blockIds: Array[BlockId],
 @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
 @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
-val retryTimeoutMs: Int = 1,
 val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
-val kinesisCreds: SparkAWSCredentials = DefaultCredentials
+val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
+val kinesisReadConfigs: KinesisReadConfigurations = 
KinesisReadConfigurations()
   ) extends BlockRDD[T](sc, _blockIds) {
 
   require(_blockIds.length == arrayOfseqNumberRanges.length,
@@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
   val credentials = kinesisCreds.provider.getCredentials
   partition.seqNumberRanges.ranges.iterator.flatMap { range =>
 new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
-  range, retryTimeoutMs).map(messageHandler)
+  range, kinesisReadConfigs).map(messageHandler)
   }
 }
 if (partition.isBlockIdValid) {
@@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+kinesisReadConfigs: KinesisReadConfigurations) extends 
NextIterator[Record] with Logging {
 
   private val client = new AmazonKinesisClient(credentials)
   private val streamName = range.streamName
@@ -251,21 +251,19 @@ class 

spark git commit: [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries

2017-05-16 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 6f62e9d9b -> 38f4e8692


[SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries

## What changes were proposed in this pull request?

The pull requests proposes to remove the hardcoded values for Amazon Kinesis - 
MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES.

This change is critical for kinesis checkpoint recovery when the kinesis backed 
rdd is huge.
Following happens in a typical kinesis recovery :
- kinesis throttles large number of requests while recovering
- retries in case of throttling are not able to recover due to the small wait 
period
- kinesis throttles per second, the wait period should be configurable for 
recovery

The patch picks the spark kinesis configs from:
- spark.streaming.kinesis.retry.wait.time
- spark.streaming.kinesis.retry.max.attempts

Jira : https://issues.apache.org/jira/browse/SPARK-20140

## How was this patch tested?

Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the 
modified configurations. Wasn't able to test the patch with actual throttling.

Author: Yash Sharma 

Closes #17467 from yssharma/ysharma/spark-kinesis-retries.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38f4e869
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38f4e869
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38f4e869

Branch: refs/heads/master
Commit: 38f4e8692ce3b6cbcfe0c1aff9b5e662f7a308b7
Parents: 6f62e9d
Author: Yash Sharma 
Authored: Tue May 16 15:08:05 2017 -0700
Committer: Burak Yavuz 
Committed: Tue May 16 15:08:05 2017 -0700

--
 .../kinesis/KinesisBackedBlockRDD.scala | 33 -
 .../streaming/kinesis/KinesisInputDStream.scala |  6 +-
 .../kinesis/KinesisReadConfigurations.scala | 78 
 .../streaming/kinesis/KinesisStreamSuite.scala  | 49 +++-
 4 files changed, 143 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38f4e869/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index f31ebf1..88b2942 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.AWSCredentials
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
@@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag](
 @transient private val _blockIds: Array[BlockId],
 @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
 @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
-val retryTimeoutMs: Int = 1,
 val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
-val kinesisCreds: SparkAWSCredentials = DefaultCredentials
+val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
+val kinesisReadConfigs: KinesisReadConfigurations = 
KinesisReadConfigurations()
   ) extends BlockRDD[T](sc, _blockIds) {
 
   require(_blockIds.length == arrayOfseqNumberRanges.length,
@@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
   val credentials = kinesisCreds.provider.getCredentials
   partition.seqNumberRanges.ranges.iterator.flatMap { range =>
 new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
-  range, retryTimeoutMs).map(messageHandler)
+  range, kinesisReadConfigs).map(messageHandler)
   }
 }
 if (partition.isBlockIdValid) {
@@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+kinesisReadConfigs: KinesisReadConfigurations) extends 
NextIterator[Record] with Logging {
 
   private val client = new AmazonKinesisClient(credentials)
   private val streamName = range.streamName
@@ -251,21 +251,19 @@ class KinesisSequenceRangeIterator(
 
   /** Helper method to retry Kinesis API request with exponential backoff and 
timeouts */
   

spark git commit: [SPARK-20441][SPARK-20432][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation

2017-05-03 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b5947f5c3 -> b1a732fea


[SPARK-20441][SPARK-20432][SS] Within the same streaming query, one 
StreamingRelation should only be transformed to one StreamingExecutionRelation

## What changes were proposed in this pull request?

Within the same streaming query, when one `StreamingRelation` is referred 
multiple times – e.g. `df.union(df)` – we should transform it only to one 
`StreamingExecutionRelation`, instead of two or more different 
`StreamingExecutionRelation`s (each of which would have a separate set of 
source, source logs, ...).

## How was this patch tested?

Added two test cases, each of which would fail without this patch.

Author: Liwei Lin 

Closes #17735 from lw-lin/SPARK-20441.

(cherry picked from commit 27f543b15f2f493f6f8373e46b4c9564b0a1bf81)
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1a732fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1a732fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1a732fe

Branch: refs/heads/branch-2.2
Commit: b1a732fead32a37afcb7cf7a35facc49a449b8e2
Parents: b5947f5
Author: Liwei Lin 
Authored: Wed May 3 08:55:02 2017 -0700
Committer: Burak Yavuz 
Committed: Wed May 3 08:55:17 2017 -0700

--
 .../execution/streaming/StreamExecution.scala   | 20 
 .../spark/sql/streaming/StreamSuite.scala   | 48 
 2 files changed, 60 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1a732fe/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index affc201..b6ddf74 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantLock
 
+import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
@@ -148,15 +149,18 @@ class StreamExecution(
   "logicalPlan must be initialized in StreamExecutionThread " +
 s"but the current thread was ${Thread.currentThread}")
 var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
 val _logicalPlan = analyzedPlan.transform {
-  case StreamingRelation(dataSource, _, output) =>
-// Materialize source to avoid creating it in every batch
-val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
-val source = dataSource.createSource(metadataPath)
-nextSourceId += 1
-// We still need to use the previous `output` instead of 
`source.schema` as attributes in
-// "df.logicalPlan" has already used attributes of the previous 
`output`.
-StreamingExecutionRelation(source, output)
+  case streamingRelation@StreamingRelation(dataSource, _, output) =>
+toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+  // Materialize source to avoid creating it in every batch
+  val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+  val source = dataSource.createSource(metadataPath)
+  nextSourceId += 1
+  // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+  // "df.logicalPlan" has already used attributes of the previous 
`output`.
+  StreamingExecutionRelation(source, output)
+})
 }
 sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
 uniqueSources = sources.distinct

http://git-wip-us.apache.org/repos/asf/spark/blob/b1a732fe/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 01ea62a..1fc0629 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -71,6 +71,27 @@ class StreamSuite extends StreamTest {
   CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, 

spark git commit: [SPARK-20441][SPARK-20432][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation

2017-05-03 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 7f96f2d7f -> 27f543b15


[SPARK-20441][SPARK-20432][SS] Within the same streaming query, one 
StreamingRelation should only be transformed to one StreamingExecutionRelation

## What changes were proposed in this pull request?

Within the same streaming query, when one `StreamingRelation` is referred 
multiple times – e.g. `df.union(df)` – we should transform it only to one 
`StreamingExecutionRelation`, instead of two or more different 
`StreamingExecutionRelation`s (each of which would have a separate set of 
source, source logs, ...).

## How was this patch tested?

Added two test cases, each of which would fail without this patch.

Author: Liwei Lin 

Closes #17735 from lw-lin/SPARK-20441.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27f543b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27f543b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27f543b1

Branch: refs/heads/master
Commit: 27f543b15f2f493f6f8373e46b4c9564b0a1bf81
Parents: 7f96f2d
Author: Liwei Lin 
Authored: Wed May 3 08:55:02 2017 -0700
Committer: Burak Yavuz 
Committed: Wed May 3 08:55:02 2017 -0700

--
 .../execution/streaming/StreamExecution.scala   | 20 
 .../spark/sql/streaming/StreamSuite.scala   | 48 
 2 files changed, 60 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27f543b1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index affc201..b6ddf74 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantLock
 
+import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
@@ -148,15 +149,18 @@ class StreamExecution(
   "logicalPlan must be initialized in StreamExecutionThread " +
 s"but the current thread was ${Thread.currentThread}")
 var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
 val _logicalPlan = analyzedPlan.transform {
-  case StreamingRelation(dataSource, _, output) =>
-// Materialize source to avoid creating it in every batch
-val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
-val source = dataSource.createSource(metadataPath)
-nextSourceId += 1
-// We still need to use the previous `output` instead of 
`source.schema` as attributes in
-// "df.logicalPlan" has already used attributes of the previous 
`output`.
-StreamingExecutionRelation(source, output)
+  case streamingRelation@StreamingRelation(dataSource, _, output) =>
+toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+  // Materialize source to avoid creating it in every batch
+  val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+  val source = dataSource.createSource(metadataPath)
+  nextSourceId += 1
+  // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+  // "df.logicalPlan" has already used attributes of the previous 
`output`.
+  StreamingExecutionRelation(source, output)
+})
 }
 sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
 uniqueSources = sources.distinct

http://git-wip-us.apache.org/repos/asf/spark/blob/27f543b1/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 01ea62a..1fc0629 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -71,6 +71,27 @@ class StreamSuite extends StreamTest {
   CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
   }
 
+  test("SPARK-20432: union one stream with itself") {
+val df = 

spark git commit: [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

2017-04-28 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 ea5b11446 -> ec712d751


[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

## What changes were proposed in this pull request?

We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka.

## How was this patch tested?

New unit test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Bill Chambers 

Closes #17804 from anabranch/SPARK-20496-2.

(cherry picked from commit 733b81b835f952ab96723c749461d6afc0c71974)
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec712d75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec712d75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec712d75

Branch: refs/heads/branch-2.2
Commit: ec712d7510579dc4c8da859c86b5236d3ee767be
Parents: ea5b114
Author: Bill Chambers 
Authored: Fri Apr 28 10:18:31 2017 -0700
Committer: Burak Yavuz 
Committed: Fri Apr 28 10:18:50 2017 -0700

--
 .../org/apache/spark/sql/kafka010/KafkaWriter.scala |  4 ++--
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala  | 16 
 2 files changed, 18 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ec712d75/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index a637d52..61936e3 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
   queryExecution: QueryExecution,
   kafkaParameters: ju.Map[String, Object],
   topic: Option[String] = None): Unit = {
-val schema = queryExecution.logical.output
+val schema = queryExecution.analyzed.output
 schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
   if (topic == None) {
 throw new AnalysisException(s"topic option required when no " +
@@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging {
   queryExecution: QueryExecution,
   kafkaParameters: ju.Map[String, Object],
   topic: Option[String] = None): Unit = {
-val schema = queryExecution.logical.output
+val schema = queryExecution.analyzed.output
 validateQuery(queryExecution, kafkaParameters, topic)
 SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
   queryExecution.toRdd.foreachPartition { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ec712d75/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 4bd052d..2ab336c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext {
   s"save mode overwrite not allowed for kafka"))
   }
 
+  test("SPARK-20496: batch - enforce analyzed plans") {
+val inputEvents =
+  spark.range(1, 1000)
+.select(to_json(struct("*")) as 'value)
+
+val topic = newTopic()
+testUtils.createTopic(topic)
+// used to throw UnresolvedException
+inputEvents.write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("topic", topic)
+  .save()
+  }
+
   test("streaming - write to kafka with topic field") {
 val input = MemoryStream[String]
 val topic = newTopic()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional 

spark git commit: [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

2017-04-28 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 6696ad0e8 -> 5131b0a96


[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

## What changes were proposed in this pull request?

We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka.

## How was this patch tested?

New unit test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Bill Chambers 

Closes #17804 from anabranch/SPARK-20496-2.

(cherry picked from commit 733b81b835f952ab96723c749461d6afc0c71974)
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5131b0a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5131b0a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5131b0a9

Branch: refs/heads/branch-2.1
Commit: 5131b0a963699b6910949ae2361fa740dafdb678
Parents: 6696ad0
Author: Bill Chambers 
Authored: Fri Apr 28 10:18:31 2017 -0700
Committer: Burak Yavuz 
Committed: Fri Apr 28 10:19:14 2017 -0700

--
 .../org/apache/spark/sql/kafka010/KafkaWriter.scala |  4 ++--
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala  | 16 
 2 files changed, 18 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5131b0a9/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index a637d52..61936e3 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
   queryExecution: QueryExecution,
   kafkaParameters: ju.Map[String, Object],
   topic: Option[String] = None): Unit = {
-val schema = queryExecution.logical.output
+val schema = queryExecution.analyzed.output
 schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
   if (topic == None) {
 throw new AnalysisException(s"topic option required when no " +
@@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging {
   queryExecution: QueryExecution,
   kafkaParameters: ju.Map[String, Object],
   topic: Option[String] = None): Unit = {
-val schema = queryExecution.logical.output
+val schema = queryExecution.analyzed.output
 validateQuery(queryExecution, kafkaParameters, topic)
 SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
   queryExecution.toRdd.foreachPartition { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5131b0a9/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 4905356..1e7f4f2 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -107,6 +108,21 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext {
   s"save mode overwrite not allowed for kafka"))
   }
 
+  test("SPARK-20496: batch - enforce analyzed plans") {
+val inputEvents =
+  spark.range(1, 1000)
+.select(to_json(struct("*")) as 'value)
+
+val topic = newTopic()
+testUtils.createTopic(topic)
+// used to throw UnresolvedException
+inputEvents.write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("topic", topic)
+  .save()
+  }
+
   test("streaming - write to kafka with topic field") {
 val input = MemoryStream[String]
 val topic = newTopic()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional 

spark git commit: [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

2017-04-28 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 8c911adac -> 733b81b83


[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

## What changes were proposed in this pull request?

We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka.

## How was this patch tested?

New unit test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Bill Chambers 

Closes #17804 from anabranch/SPARK-20496-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/733b81b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/733b81b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/733b81b8

Branch: refs/heads/master
Commit: 733b81b835f952ab96723c749461d6afc0c71974
Parents: 8c911ad
Author: Bill Chambers 
Authored: Fri Apr 28 10:18:31 2017 -0700
Committer: Burak Yavuz 
Committed: Fri Apr 28 10:18:31 2017 -0700

--
 .../org/apache/spark/sql/kafka010/KafkaWriter.scala |  4 ++--
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala  | 16 
 2 files changed, 18 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/733b81b8/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index a637d52..61936e3 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
   queryExecution: QueryExecution,
   kafkaParameters: ju.Map[String, Object],
   topic: Option[String] = None): Unit = {
-val schema = queryExecution.logical.output
+val schema = queryExecution.analyzed.output
 schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
   if (topic == None) {
 throw new AnalysisException(s"topic option required when no " +
@@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging {
   queryExecution: QueryExecution,
   kafkaParameters: ju.Map[String, Object],
   topic: Option[String] = None): Unit = {
-val schema = queryExecution.logical.output
+val schema = queryExecution.analyzed.output
 validateQuery(queryExecution, kafkaParameters, topic)
 SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
   queryExecution.toRdd.foreachPartition { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/733b81b8/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 4bd052d..2ab336c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext {
   s"save mode overwrite not allowed for kafka"))
   }
 
+  test("SPARK-20496: batch - enforce analyzed plans") {
+val inputEvents =
+  spark.range(1, 1000)
+.select(to_json(struct("*")) as 'value)
+
+val topic = newTopic()
+testUtils.createTopic(topic)
+// used to throw UnresolvedException
+inputEvents.write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("topic", topic)
+  .save()
+  }
+
   test("streaming - write to kafka with topic field") {
 val input = MemoryStream[String]
 val topic = newTopic()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19911][STREAMING] Add builder interface for Kinesis DStreams

2017-03-24 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 9299d071f -> 707e50183


[SPARK-19911][STREAMING] Add builder interface for Kinesis DStreams

## What changes were proposed in this pull request?

- Add new KinesisDStream.scala containing KinesisDStream.Builder class
- Add KinesisDStreamBuilderSuite test suite
- Make KinesisInputDStream ctor args package private for testing
- Add JavaKinesisDStreamBuilderSuite test suite
- Add args to KinesisInputDStream and KinesisReceiver for optional
  service-specific auth (Kinesis, DynamoDB and CloudWatch)
## How was this patch tested?

Added ```KinesisDStreamBuilderSuite``` to verify builder class works as expected

Author: Adam Budde 

Closes #17250 from budde/KinesisStreamBuilder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/707e5018
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/707e5018
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/707e5018

Branch: refs/heads/master
Commit: 707e501832fa7adde0a884c528a7352983d83520
Parents: 9299d07
Author: Adam Budde 
Authored: Fri Mar 24 12:40:29 2017 -0700
Committer: Burak Yavuz 
Committed: Fri Mar 24 12:40:29 2017 -0700

--
 .../kinesis/KinesisBackedBlockRDD.scala |   6 +-
 .../streaming/kinesis/KinesisInputDStream.scala | 259 ++-
 .../streaming/kinesis/KinesisReceiver.scala |  20 +-
 .../spark/streaming/kinesis/KinesisUtils.scala  |  43 +--
 .../SerializableCredentialsProvider.scala   |  85 --
 .../streaming/kinesis/SparkAWSCredentials.scala | 182 +
 .../JavaKinesisInputDStreamBuilderSuite.java|  63 +
 .../KinesisInputDStreamBuilderSuite.scala   | 115 
 .../kinesis/KinesisReceiverSuite.scala  |  23 --
 .../streaming/kinesis/KinesisStreamSuite.scala  |   2 +-
 .../SparkAWSCredentialsBuilderSuite.scala   | 100 +++
 11 files changed, 749 insertions(+), 149 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/707e5018/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 0f1790b..f31ebf1 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -82,8 +82,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
 @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
 @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
 val retryTimeoutMs: Int = 1,
-val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
-val kinesisCredsProvider: SerializableCredentialsProvider = 
DefaultCredentialsProvider
+val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
+val kinesisCreds: SparkAWSCredentials = DefaultCredentials
   ) extends BlockRDD[T](sc, _blockIds) {
 
   require(_blockIds.length == arrayOfseqNumberRanges.length,
@@ -109,7 +109,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
 }
 
 def getBlockFromKinesis(): Iterator[T] = {
-  val credentials = kinesisCredsProvider.provider.getCredentials
+  val credentials = kinesisCreds.provider.getCredentials
   partition.seqNumberRanges.ranges.iterator.flatMap { range =>
 new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
   range, retryTimeoutMs).map(messageHandler)

http://git-wip-us.apache.org/repos/asf/spark/blob/707e5018/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
index fbc6b99..8970ad2 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -22,24 +22,28 @@ import scala.reflect.ClassTag
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.model.Record
 
+import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import 

spark git commit: Fix compilation of the Scala 2.10 master branch

2017-03-23 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master c79118070 -> 93581fbc1


Fix compilation of the Scala 2.10 master branch

## What changes were proposed in this pull request?

Fixes break caused by: 
https://github.com/apache/spark/commit/746a558de2136f91f8fe77c6e51256017aa50913

## How was this patch tested?

Compiled with `build/sbt -Dscala2.10 sql/compile` locally

Author: Burak Yavuz <brk...@gmail.com>

Closes #17403 from brkyvz/onceTrigger2.10.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93581fbc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93581fbc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93581fbc

Branch: refs/heads/master
Commit: 93581fbc18c01595918c565f6737aaa666116114
Parents: c791180
Author: Burak Yavuz <brk...@gmail.com>
Authored: Thu Mar 23 17:57:31 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Thu Mar 23 17:57:31 2017 -0700

--
 .../spark/sql/streaming/ProcessingTime.scala| 20 ++--
 .../org/apache/spark/sql/streaming/Trigger.java |  2 +-
 2 files changed, 11 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/93581fbc/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
index bdad8e4..9ba1fc0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -51,7 +51,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
  */
 @Experimental
 @InterfaceStability.Evolving
-@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
+@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0")
 case class ProcessingTime(intervalMs: Long) extends Trigger {
   require(intervalMs >= 0, "the interval of trigger should not be negative")
 }
@@ -64,7 +64,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
  */
 @Experimental
 @InterfaceStability.Evolving
-@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
+@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0")
 object ProcessingTime {
 
   /**
@@ -76,9 +76,9 @@ object ProcessingTime {
* }}}
*
* @since 2.0.0
-   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   * @deprecated use Trigger.ProcessingTime(interval)
*/
-  @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
+  @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
   def apply(interval: String): ProcessingTime = {
 if (StringUtils.isBlank(interval)) {
   throw new IllegalArgumentException(
@@ -108,9 +108,9 @@ object ProcessingTime {
* }}}
*
* @since 2.0.0
-   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   * @deprecated use Trigger.ProcessingTime(interval)
*/
-  @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
+  @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
   def apply(interval: Duration): ProcessingTime = {
 new ProcessingTime(interval.toMillis)
   }
@@ -124,9 +124,9 @@ object ProcessingTime {
* }}}
*
* @since 2.0.0
-   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   * @deprecated use Trigger.ProcessingTime(interval)
*/
-  @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
+  @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
   def create(interval: String): ProcessingTime = {
 apply(interval)
   }
@@ -141,9 +141,9 @@ object ProcessingTime {
* }}}
*
* @since 2.0.0
-   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   * @deprecated use Trigger.ProcessingTime(interval, unit)
*/
-  @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0")
+  @deprecated("use Trigger.ProcessingTime(interval, unit)", "2.2.0")
   def create(interval: Long, unit: TimeUnit): ProcessingTime = {
 new ProcessingTime(unit.toMillis(interval))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/93581fbc/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/

spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource

2017-03-08 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 455129020 -> a3648b5d4


[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in 
combination with maxFileAge in FileStreamSource

## What changes were proposed in this pull request?

**The Problem**
There is a file stream source option called maxFileAge which limits how old the 
files can be, relative the latest file that has been seen. This is used to 
limit the files that need to be remembered as "processed". Files older than the 
latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be any 
unprocessed file older than the latest processed file. All files will be 
processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge 
thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is 
not, then all old files get processed in the first batch, and so no file is 
left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch 
process the latest X files. That sets the threshold latest file - maxFileAge, 
so files older than this threshold will never be considered for processing.
The bug is with case 3.

**The Solution**

Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.

## How was this patch tested?

Regression test in `FileStreamSourceSuite`

Author: Burak Yavuz <brk...@gmail.com>

Closes #17153 from brkyvz/maxFileAge.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3648b5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3648b5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3648b5d

Branch: refs/heads/master
Commit: a3648b5d4f99ff9461d02f53e9ec71787a3abf51
Parents: 4551290
Author: Burak Yavuz <brk...@gmail.com>
Authored: Wed Mar 8 14:35:07 2017 -0800
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Wed Mar 8 14:35:07 2017 -0800

--
 .../execution/streaming/FileStreamOptions.scala |  5 +-
 .../execution/streaming/FileStreamSource.scala  | 14 +++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 82 
 3 files changed, 63 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 2f802d7..e7ba901 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -38,7 +38,10 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
   }
 
   /**
-   * Maximum age of a file that can be found in this directory, before it is 
deleted.
+   * Maximum age of a file that can be found in this directory, before it is 
ignored. For the
+   * first batch all files will be considered valid. If `latestFirst` is set 
to `true` and
+   * `maxFilesPerTrigger` is set, then this parameter will be ignored, because 
old files that are
+   * valid, and should be processed, may be ignored. Please refer to 
SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest 
file, and not the
* timestamp of the current system. That this means if the last file has 
timestamp 1000, and the

http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 6a7263c..0f09b0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -66,23 +66,29 @@ class FileStreamSource(
 
   private val fileSortOrder = if (sourceOptions.latestFirst) {
   logWarning(
-"""'latestFirst' is true. New files will be processed first.
-  |It may affect the watermark value""".stripMargin)
+"""'latestFirst' is true. New files will be 

spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource

2017-03-08 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 320eff14b -> f6c1ad2eb


[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in 
combination with maxFileAge in FileStreamSource

## What changes were proposed in this pull request?

**The Problem**
There is a file stream source option called maxFileAge which limits how old the 
files can be, relative the latest file that has been seen. This is used to 
limit the files that need to be remembered as "processed". Files older than the 
latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be any 
unprocessed file older than the latest processed file. All files will be 
processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge 
thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is 
not, then all old files get processed in the first batch, and so no file is 
left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch 
process the latest X files. That sets the threshold latest file - maxFileAge, 
so files older than this threshold will never be considered for processing.
The bug is with case 3.

**The Solution**

Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.

## How was this patch tested?

Regression test in `FileStreamSourceSuite`

Author: Burak Yavuz <brk...@gmail.com>

Closes #17153 from brkyvz/maxFileAge.

(cherry picked from commit a3648b5d4f99ff9461d02f53e9ec71787a3abf51)
Signed-off-by: Burak Yavuz <brk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6c1ad2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6c1ad2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6c1ad2e

Branch: refs/heads/branch-2.1
Commit: f6c1ad2eb6d0706899aabbdd39e558b3488e2ef3
Parents: 320eff1
Author: Burak Yavuz <brk...@gmail.com>
Authored: Wed Mar 8 14:35:07 2017 -0800
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Wed Mar 8 14:35:22 2017 -0800

--
 .../execution/streaming/FileStreamOptions.scala |  5 +-
 .../execution/streaming/FileStreamSource.scala  | 14 +++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 82 
 3 files changed, 63 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 25ebe17..fe64838 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) 
extends Logging {
   }
 
   /**
-   * Maximum age of a file that can be found in this directory, before it is 
deleted.
+   * Maximum age of a file that can be found in this directory, before it is 
ignored. For the
+   * first batch all files will be considered valid. If `latestFirst` is set 
to `true` and
+   * `maxFilesPerTrigger` is set, then this parameter will be ignored, because 
old files that are
+   * valid, and should be processed, may be ignored. Please refer to 
SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest 
file, and not the
* timestamp of the current system. That this means if the last file has 
timestamp 1000, and the

http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 39c0b49..0f0b6f1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -64,23 +64,29 @@ class FileStreamSource(
 
   private val fileSortOrder = if (sourceOptions.latestFirst) {
   logWarning(
-"""'latestFirst' is true. New files will be processed first.
-

spark git commit: [SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery

2017-03-06 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 339b53a13 -> 46a64d1e0


[SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery

## What changes were proposed in this pull request?
added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps 
reduce the amount of data returned by kinesis api call making the recovery 
considerably faster

As we are storing the `fromSeqNum` & `toSeqNum` in checkpoint metadata, we can 
also store the number of records. Which can later be used for api call.

## How was this patch tested?
The patch was manually tested

Apologies for any silly mistakes, opening first pull request

Author: Gaurav 

Closes #16842 from Gauravshah/kinesis_checkpoint_recovery_fix_2_1_0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46a64d1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46a64d1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46a64d1e

Branch: refs/heads/master
Commit: 46a64d1e0ae12c31e848f377a84fb28e3efb3699
Parents: 339b53a
Author: Gaurav 
Authored: Mon Mar 6 10:41:49 2017 -0800
Committer: Burak Yavuz 
Committed: Mon Mar 6 10:41:49 2017 -0800

--
 .../kinesis/KinesisBackedBlockRDD.scala | 25 +++-
 .../streaming/kinesis/KinesisReceiver.scala |  3 ++-
 .../kinesis/KinesisBackedBlockRDDSuite.scala|  4 ++--
 .../streaming/kinesis/KinesisStreamSuite.scala  |  4 ++--
 4 files changed, 25 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 23c4d99..0f1790b 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator
 /** Class representing a range of Kinesis sequence numbers. Both sequence 
numbers are inclusive. */
 private[kinesis]
 case class SequenceNumberRange(
-streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: 
String)
+streamName: String,
+shardId: String,
+fromSeqNumber: String,
+toSeqNumber: String,
+recordCount: Int)
 
 /** Class representing an array of Kinesis sequence number ranges */
 private[kinesis]
@@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator(
   private val client = new AmazonKinesisClient(credentials)
   private val streamName = range.streamName
   private val shardId = range.shardId
+  // AWS limits to maximum of 10k records per get call
+  private val maxGetRecordsLimit = 1
 
   private var toSeqNumberReceived = false
   private var lastSeqNumber: String = null
@@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator(
 
 // If the internal iterator has not been initialized,
 // then fetch records from starting sequence number
-internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, 
range.fromSeqNumber)
+internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, 
range.fromSeqNumber,
+  range.recordCount)
   } else if (!internalIterator.hasNext) {
 
 // If the internal iterator does not have any more records,
 // then fetch more records after the last consumed sequence number
-internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, 
lastSeqNumber)
+internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, 
lastSeqNumber,
+  range.recordCount)
   }
 
   if (!internalIterator.hasNext) {
@@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator(
   /**
* Get records starting from or after the given sequence number.
*/
-  private def getRecords(iteratorType: ShardIteratorType, seqNum: String): 
Iterator[Record] = {
+  private def getRecords(
+  iteratorType: ShardIteratorType,
+  seqNum: String,
+  recordCount: Int): Iterator[Record] = {
 val shardIterator = getKinesisIterator(iteratorType, seqNum)
-val result = getRecordsAndNextKinesisIterator(shardIterator)
+val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
 result._1
   }
 
@@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator(
* to get records from Kinesis), and get the next shard iterator for next 
consumption.
*/
   private def 

spark git commit: [SPARK-19595][SQL] Support json array in from_json

2017-03-05 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 80d5338b3 -> 369a148e5


[SPARK-19595][SQL] Support json array in from_json

## What changes were proposed in this pull request?

This PR proposes to both,

**Do not allow json arrays with multiple elements and return null in 
`from_json` with `StructType` as the schema.**

Currently, it only reads the single row when the input is a json array. So, the 
codes below:

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(StructField("a", IntegerType) :: Nil)
Seq(("""[{"a": 1}, {"a": 
2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show()
```
prints

```
++
|jsontostruct(struct)|
++
| [1]|
++
```

This PR simply suggests to print this as `null` if the schema is `StructType` 
and input is json array.with multiple elements

```
++
|jsontostruct(struct)|
++
|null|
++
```

**Support json arrays in `from_json` with `ArrayType` as the schema.**

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), 
schema)).show()
```

prints

```
+---+
|jsontostruct(array)|
+---+
| [[1], [2]]|
+---+
```

## How was this patch tested?

Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and 
manual test.

Author: hyukjinkwon 

Closes #16929 from HyukjinKwon/disallow-array.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/369a148e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/369a148e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/369a148e

Branch: refs/heads/master
Commit: 369a148e591bb16ec7da54867610b207602cd698
Parents: 80d5338
Author: hyukjinkwon 
Authored: Sun Mar 5 14:35:06 2017 -0800
Committer: Burak Yavuz 
Committed: Sun Mar 5 14:35:06 2017 -0800

--
 python/pyspark/sql/functions.py | 11 +++-
 .../catalyst/expressions/jsonExpressions.scala  | 57 ---
 .../expressions/JsonExpressionsSuite.scala  | 58 +++-
 .../scala/org/apache/spark/sql/functions.scala  | 52 --
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 25 -
 5 files changed, 186 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 426a4a8..376b86e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1773,11 +1773,11 @@ def json_tuple(col, *fields):
 @since(2.1)
 def from_json(col, schema, options={}):
 """
-Parses a column containing a JSON string into a [[StructType]] with the
-specified schema. Returns `null`, in the case of an unparseable string.
+Parses a column containing a JSON string into a [[StructType]] or 
[[ArrayType]]
+with the specified schema. Returns `null`, in the case of an unparseable 
string.
 
 :param col: string column in json format
-:param schema: a StructType to use when parsing the json column
+:param schema: a StructType or ArrayType to use when parsing the json 
column
 :param options: options to control parsing. accepts the same options as 
the json datasource
 
 >>> from pyspark.sql.types import *
@@ -1786,6 +1786,11 @@ def from_json(col, schema, options={}):
 >>> df = spark.createDataFrame(data, ("key", "value"))
 >>> df.select(from_json(df.value, schema).alias("json")).collect()
 [Row(json=Row(a=1))]
+>>> data = [(1, '''[{"a": 1}]''')]
+>>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
+>>> df = spark.createDataFrame(data, ("key", "value"))
+>>> df.select(from_json(df.value, schema).alias("json")).collect()
+[Row(json=[Row(a=1)])]
 """
 
 sc = SparkContext._active_spark_context

http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 1e690a4..dbff62e 100644
--- 

spark git commit: [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors

2017-02-13 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ef4fb7ebc -> c5a7cb022


[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without 
errors

## What changes were proposed in this pull request?

When a query uses a temp checkpoint dir, it's better to delete it if it's 
stopped without errors.

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu 

Closes #16880 from zsxwing/delete-temp-checkpoint.

(cherry picked from commit 3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529)
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5a7cb02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5a7cb02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5a7cb02

Branch: refs/heads/branch-2.1
Commit: c5a7cb0225ed4ed0d1ede5da0593b258c5dfd79f
Parents: ef4fb7e
Author: Shixiong Zhu 
Authored: Mon Feb 13 11:54:54 2017 -0800
Committer: Burak Yavuz 
Committed: Mon Feb 13 11:55:11 2017 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 --
 .../sql/streaming/StreamingQueryManager.scala   |  6 -
 .../test/DataStreamReaderWriterSuite.scala  | 26 
 3 files changed, 53 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c5a7cb02/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index a35950e..a8ec73e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.locks.ReentrantLock
@@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, 
Utils}
  * Unlike a standard query, a streaming query executes repeatedly each time 
new data arrives at any
  * [[Source]] present in the query plan. Whenever new data arrives, a 
[[QueryExecution]] is created
  * and the results are committed transactionally to the given [[Sink]].
+ *
+ * @param deleteCheckpointOnStop whether to delete the checkpoint if the query 
is stopped without
+ *   errors
  */
 class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
-checkpointRoot: String,
+val checkpointRoot: String,
 analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
-val outputMode: OutputMode)
+val outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
   extends StreamingQuery with ProgressReporter with Logging {
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -213,6 +218,7 @@ class StreamExecution(
* has been posted to all the listeners.
*/
   def start(): Unit = {
+logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query 
checkpoint.")
 microBatchThread.setDaemon(true)
 microBatchThread.start()
 startLatch.await()  // Wait until thread started and QueryStart event has 
been posted
@@ -323,6 +329,20 @@ class StreamExecution(
 sparkSession.streams.notifyQueryTermination(StreamExecution.this)
 postEvent(
   new QueryTerminatedEvent(id, runId, 
exception.map(_.cause).map(Utils.exceptionString)))
+
+// Delete the temp checkpoint only when the query didn't fail
+if (deleteCheckpointOnStop && exception.isEmpty) {
+  val checkpointPath = new Path(checkpointRoot)
+  try {
+val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.delete(checkpointPath, true)
+  } catch {
+case NonFatal(e) =>
+  // Deleting temp checkpoint folder is best effort, don't throw 
non fatal exceptions
+  // when we cannot delete them.
+  logWarning(s"Cannot delete $checkpointPath", e)
+  }
+}
   } finally {
 terminationLatch.countDown()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c5a7cb02/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 

spark git commit: [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors

2017-02-13 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 0417ce878 -> 3dbff9be0


[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without 
errors

## What changes were proposed in this pull request?

When a query uses a temp checkpoint dir, it's better to delete it if it's 
stopped without errors.

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu 

Closes #16880 from zsxwing/delete-temp-checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dbff9be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dbff9be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dbff9be

Branch: refs/heads/master
Commit: 3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529
Parents: 0417ce8
Author: Shixiong Zhu 
Authored: Mon Feb 13 11:54:54 2017 -0800
Committer: Burak Yavuz 
Committed: Mon Feb 13 11:54:54 2017 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 --
 .../sql/streaming/StreamingQueryManager.scala   |  6 -
 .../test/DataStreamReaderWriterSuite.scala  | 26 
 3 files changed, 53 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3dbff9be/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ea37194..3149ef0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.locks.ReentrantLock
@@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, 
Utils}
  * Unlike a standard query, a streaming query executes repeatedly each time 
new data arrives at any
  * [[Source]] present in the query plan. Whenever new data arrives, a 
[[QueryExecution]] is created
  * and the results are committed transactionally to the given [[Sink]].
+ *
+ * @param deleteCheckpointOnStop whether to delete the checkpoint if the query 
is stopped without
+ *   errors
  */
 class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
-checkpointRoot: String,
+val checkpointRoot: String,
 analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
-val outputMode: OutputMode)
+val outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
   extends StreamingQuery with ProgressReporter with Logging {
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -213,6 +218,7 @@ class StreamExecution(
* has been posted to all the listeners.
*/
   def start(): Unit = {
+logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query 
checkpoint.")
 microBatchThread.setDaemon(true)
 microBatchThread.start()
 startLatch.await()  // Wait until thread started and QueryStart event has 
been posted
@@ -323,6 +329,20 @@ class StreamExecution(
 sparkSession.streams.notifyQueryTermination(StreamExecution.this)
 postEvent(
   new QueryTerminatedEvent(id, runId, 
exception.map(_.cause).map(Utils.exceptionString)))
+
+// Delete the temp checkpoint only when the query didn't fail
+if (deleteCheckpointOnStop && exception.isEmpty) {
+  val checkpointPath = new Path(checkpointRoot)
+  try {
+val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.delete(checkpointPath, true)
+  } catch {
+case NonFatal(e) =>
+  // Deleting temp checkpoint folder is best effort, don't throw 
non fatal exceptions
+  // when we cannot delete them.
+  logWarning(s"Cannot delete $checkpointPath", e)
+  }
+}
   } finally {
 terminationLatch.countDown()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3dbff9be/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 

spark git commit: [SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multipl

2017-01-26 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 9f523d319 -> 1191fe267


[SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix 
multiplication and solve potential OOM and low parallelism usage problem By 
split middle dimension in matrix multiplication

## What changes were proposed in this pull request?

### The problem in current block matrix mulitiplication

As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block 
matrix multiplication in spark may cause some problem, suppose we have `M*N` 
dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger 
than M and P, then the following problem may occur:
- when the middle dimension N is too large, it will cause reducer OOM.
- even if OOM do not occur, it will still cause parallism too low.
- when N is much large than M and P, and matrix A and B have many partitions, 
it may cause too many partition on M and P dimension, it will cause much larger 
shuffled data size. (I will expain this in detail in the following.)

### Key point of my improvement

In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, 
to resolve this problem.

In order to understand the improvement in this PR, first let me give a simple 
case to explain how the current mulitiplication works and what cause the 
problems above:

suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 
numColBlocks`), blocks arranged in 2 rows, 100 cols:
```
A00 A01 A02 ... A0,99
A10 A11 A12 ... A1,99
```
and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 
numColBlocks`), blocks arranged in 100 rows, 2 cols:
```
B00B01
B10B11
B20B21
...
B99,0  B99,1
```
Suppose all blocks in the two matrices are dense for now.
Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 
rowPartitions and 2 colPartitions (can't be more partitions because the result 
matrix only contains `2 * 2` blocks), the current algorithm will contains two 
shuffle steps:

**step-1**
Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, 
reducer-10, reducer-11, and shuffle data as following:
```
A00 A01 A02 ... A0,99
B00 B10 B20 ... B99,0shuffled into reducer-00

A00 A01 A02 ... A0,99
B01 B11 B21 ... B99,1shuffled into reducer-01

A10 A11 A12 ... A1,99
B00 B10 B20 ... B99,0shuffled into reducer-10

A10 A11 A12 ... A1,99
B01 B11 B21 ... B99,1shuffled into reducer-11
```

and the shuffling above is a `cogroup` transform, note that each reducer 
contains **only one group**.

**step-2**
Step-2 will do an `aggregateByKey` transform on the result of step-1, will also 
generate 4 reducers, and generate the final result RDD, contains 4 partitions, 
each partition contains one block.

The main problems are in step-1. Now we have only 4 reducers, but matrix A and 
B have 400 blocks in total, obviously the reducer number is too small.
and, we can see that, each reducer contains only one group(the group concept in 
`coGroup` transform), each group contains 200 blocks. This is terrible because 
we know that `coGroup` transformer will load each group into memory when 
computing. It is un-extensable in the algorithm level. Suppose matrix A has 
1 cols blocks or more instead of 100? Than each reducer will load 2 
blocks into memory. It will easily cause reducer OOM.

This PR try to resolve the problem described above.
When matrix A with dimension M * N multiply matrix B with dimension N * P, the 
middle dimension N is the keypoint. If N is large, the current mulitiplication 
implementation works badly.
In this PR, I introduce a `numMidDimSplits` parameter, represent how many 
splits it will cut on the middle dimension N.
Still using the example described above, now we set `numMidDimSplits = 10`, now 
we can generate 40 reducers in **step-1**:

the reducer-ij above now will be splited into 10 reducers: reducer-ij0, 
reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks.
now the shuffle works as following:

**reducer-000 to reducer-009**
```
A0,0 A0,10 A0,20 ... A0,90
B0,0 B10,0 B20,0 ... B90,0shuffled into reducer-000

A0,1 A0,11 A0,21 ... A0,91
B1,0 B11,0 B21,0 ... B91,0shuffled into reducer-001

A0,2 A0,12 A0,22 ... A0,92
B2,0 B12,0 B22,0 ... B92,0shuffled into reducer-002

...

A0,9 A0,19 A0,29 ... A0,99
B9,0 B19,0 B29,0 ... B99,0shuffled into reducer-009
```

**reducer-010 to reducer-019**
```
A0,0 A0,10 A0,20 ... A0,90
B0,1 B10,1 B20,1 ... B90,1shuffled into reducer-010

A0,1 A0,11 A0,21 ... A0,91
B1,1 B11,1 B21,1 ... B91,1shuffled into reducer-011

A0,2 A0,12 A0,22 ... A0,92
B2,1 B12,1 B22,1 ... B92,1shuffled into reducer-012

...

A0,9 A0,19 A0,29 ... A0,99
B9,1 B19,1 B29,1 ... B99,1shuffled into reducer-019
```

**reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to 
the above, I omit to write them out.

### API for this optimized algorithm

I add a new API as following:
```
  def 

spark git commit: [SPARK-18020][STREAMING][KINESIS] Checkpoint SHARD_END to finish reading closed shards

2017-01-25 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 233845126 -> 256a3a801


[SPARK-18020][STREAMING][KINESIS] Checkpoint SHARD_END to finish reading closed 
shards

## What changes were proposed in this pull request?
This pr is to fix an issue occurred when resharding Kinesis streams; the 
resharding makes the KCL throw an exception because Spark does not checkpoint 
`SHARD_END` when finishing reading closed shards in 
`KinesisRecordProcessor#shutdown`. This bug finally leads to stopping 
subscribing new split (or merged) shards.

## How was this patch tested?
Added a test in `KinesisStreamSuite` to check if it works well when 
splitting/merging shards.

Author: Takeshi YAMAMURO 

Closes #16213 from maropu/SPARK-18020.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/256a3a80
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/256a3a80
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/256a3a80

Branch: refs/heads/master
Commit: 256a3a801366ab9f705e50690114e49fdb49b38e
Parents: 2338451
Author: Takeshi YAMAMURO 
Authored: Wed Jan 25 17:38:48 2017 -0800
Committer: Burak Yavuz 
Committed: Wed Jan 25 17:38:48 2017 -0800

--
 .../streaming/kinesis/KinesisCheckpointer.scala | 15 -
 .../streaming/kinesis/KinesisTestUtils.scala| 30 -
 .../kinesis/KPLBasedKinesisTestUtils.scala  |  3 +-
 .../kinesis/KinesisCheckpointerSuite.scala  |  5 +-
 .../streaming/kinesis/KinesisStreamSuite.scala  | 70 
 python/pyspark/streaming/tests.py   |  2 +-
 6 files changed, 117 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/256a3a80/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
index 3e697f3..c445c15 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -64,7 +64,20 @@ private[kinesis] class KinesisCheckpointer(
   def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
 synchronized {
   checkpointers.remove(shardId)
-  checkpoint(shardId, checkpointer)
+}
+if (checkpointer != null) {
+  try {
+// We must call `checkpoint()` with no parameter to finish reading 
shards.
+// See an URL below for details:
+// https://forums.aws.amazon.com/thread.jspa?threadID=244218
+KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+  } catch {
+case NonFatal(e) =>
+  logError(s"Exception:  WorkerId $workerId encountered an exception 
while checkpointing" +
+s"to finish reading a shard of $shardId.", e)
+  // Rethrow the exception to the Kinesis Worker that is managing this 
RecordProcessor
+  throw e
+  }
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/256a3a80/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0fe6625..f183ef0 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -40,11 +40,10 @@ import org.apache.spark.internal.Logging
  *
  * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS 
FILE!
  */
-private[kinesis] class KinesisTestUtils extends Logging {
+private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends 
Logging {
 
   val endpointUrl = KinesisTestUtils.endpointUrl
   val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-  val streamShardCount = 2
 
   private val createStreamTimeoutSeconds = 300
   private val describeStreamPollTimeSeconds = 1
@@ -88,7 +87,7 @@ private[kinesis] class KinesisTestUtils extends Logging {
 logInfo(s"Creating stream ${_streamName}")
 val createStreamRequest = new CreateStreamRequest()
 createStreamRequest.setStreamName(_streamName)
-createStreamRequest.setShardCount(2)
+