spark git commit: [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
Repository: spark Updated Branches: refs/heads/master 029e40b41 -> eeb1d6db8 [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one. ## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGenCloses #17221 from uncleGen/SPARK-19859. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eeb1d6db Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eeb1d6db Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eeb1d6db Branch: refs/heads/master Commit: eeb1d6db878641d9eac62d0869a90fe80c1f4461 Parents: 029e40b Author: uncleGen Authored: Wed Mar 8 23:23:10 2017 -0800 Committer: Shixiong Zhu Committed: Wed Mar 8 23:23:10 2017 -0800 -- .../plans/logical/EventTimeWatermark.scala | 9 - .../streaming/EventTimeWatermarkExec.scala | 19 +++ 2 files changed, 19 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eeb1d6db/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 62f68a6..06196b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval object EventTimeWatermark { /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */ val delayKey = "spark.watermarkDelayMs" + + def getDelayMs(delay: CalendarInterval): Long = { +// We define month as `31 days` to simplify calculation. +val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 +delay.milliseconds + delay.months * millisPerMonth + } } /** @@ -37,9 +43,10 @@ case class EventTimeWatermark( // Update the metadata on the eventTime column to include the desired delay. override val output: Seq[Attribute] = child.output.map { a => if (a semanticEquals eventTime) { + val delayMs = EventTimeWatermark.getDelayMs(delay) val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) -.putLong(EventTimeWatermark.delayKey, delay.milliseconds) +.putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) } else if (a.metadata.contains(EventTimeWatermark.delayKey)) { http://git-wip-us.apache.org/repos/asf/spark/blob/eeb1d6db/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 5a9a99e..25cf609 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,10 +84,7 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() - val delayMs = { -val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 -delay.milliseconds + delay.months * millisPerMonth - } + val delayMs = EventTimeWatermark.getDelayMs(delay) sparkContext.register(eventTimeStats) @@ -105,10 +102,16 @@ case class EventTimeWatermarkExec( override val output: Seq[Attribute] = child.output.map { a => if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() - .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delayMs) - .build() - +.withMetadata(a.metadata) +.putLong(EventTimeWatermark.delayKey, delayMs) +.build() + a.withMetadata(updatedMetadata) +} else if (a.metadata.contains(EventTimeWatermark.delayKey)) { + // Remove existing watermark + val updatedMetadata = new MetadataBuilder() +.withMetadata(a.metadata) +.remove(EventTimeWatermark.delayKey) +.build() a.withMetadata(updatedMetadata)
spark git commit: [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
Repository: spark Updated Branches: refs/heads/branch-2.1 00859e148 -> 0c140c168 [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one. ## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGenCloses #17221 from uncleGen/SPARK-19859. (cherry picked from commit eeb1d6db878641d9eac62d0869a90fe80c1f4461) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c140c16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c140c16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c140c16 Branch: refs/heads/branch-2.1 Commit: 0c140c1682262bc27df94952bda6ad8e3229fda4 Parents: 00859e1 Author: uncleGen Authored: Wed Mar 8 23:23:10 2017 -0800 Committer: Shixiong Zhu Committed: Wed Mar 8 23:23:16 2017 -0800 -- .../plans/logical/EventTimeWatermark.scala | 9 - .../streaming/EventTimeWatermarkExec.scala | 19 +++ 2 files changed, 19 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c140c16/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index c919cdb..e0dd4c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval object EventTimeWatermark { /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */ val delayKey = "spark.watermarkDelayMs" + + def getDelayMs(delay: CalendarInterval): Long = { +// We define month as `31 days` to simplify calculation. +val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 +delay.milliseconds + delay.months * millisPerMonth + } } /** @@ -37,9 +43,10 @@ case class EventTimeWatermark( // Update the metadata on the eventTime column to include the desired delay. override val output: Seq[Attribute] = child.output.map { a => if (a semanticEquals eventTime) { + val delayMs = EventTimeWatermark.getDelayMs(delay) val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) -.putLong(EventTimeWatermark.delayKey, delay.milliseconds) +.putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) } else if (a.metadata.contains(EventTimeWatermark.delayKey)) { http://git-wip-us.apache.org/repos/asf/spark/blob/0c140c16/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 5a9a99e..25cf609 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,10 +84,7 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() - val delayMs = { -val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 -delay.milliseconds + delay.months * millisPerMonth - } + val delayMs = EventTimeWatermark.getDelayMs(delay) sparkContext.register(eventTimeStats) @@ -105,10 +102,16 @@ case class EventTimeWatermarkExec( override val output: Seq[Attribute] = child.output.map { a => if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() - .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delayMs) - .build() - +.withMetadata(a.metadata) +.putLong(EventTimeWatermark.delayKey, delayMs) +.build() + a.withMetadata(updatedMetadata) +} else if (a.metadata.contains(EventTimeWatermark.delayKey)) { + // Remove existing watermark + val updatedMetadata = new MetadataBuilder() +
spark git commit: [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal
Repository: spark Updated Branches: refs/heads/branch-2.1 78cc5721f -> 00859e148 [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal ## What changes were proposed in this pull request? The API docs should not include the "org.apache.spark.sql.internal" package because they are internal private APIs. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #17217 from zsxwing/SPARK-19874. (cherry picked from commit 029e40b412e332c9f0fff283d604e203066c78c0) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00859e14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00859e14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00859e14 Branch: refs/heads/branch-2.1 Commit: 00859e148fd1002fa314542953fee61a5d0fb9d9 Parents: 78cc572 Author: Shixiong Zhu Authored: Wed Mar 8 23:15:52 2017 -0800 Committer: Shixiong Zhu Committed: Wed Mar 8 23:16:00 2017 -0800 -- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/00859e14/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e3fbe03..e772fa0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -699,6 +699,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/collection"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal
Repository: spark Updated Branches: refs/heads/master 09829be62 -> 029e40b41 [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal ## What changes were proposed in this pull request? The API docs should not include the "org.apache.spark.sql.internal" package because they are internal private APIs. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #17217 from zsxwing/SPARK-19874. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/029e40b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/029e40b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/029e40b4 Branch: refs/heads/master Commit: 029e40b412e332c9f0fff283d604e203066c78c0 Parents: 09829be Author: Shixiong Zhu Authored: Wed Mar 8 23:15:52 2017 -0800 Committer: Shixiong Zhu Committed: Wed Mar 8 23:15:52 2017 -0800 -- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/029e40b4/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 93a3189..e52baf5 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -655,6 +655,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/collection"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore
Repository: spark Updated Branches: refs/heads/master d809ceed9 -> 09829be62 [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore ### What changes were proposed in this pull request? So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore. When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed. After this PR, a few changes are made, as summarized below, - `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`. - `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`. - `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded: 1. The following test cases only make sense for `InMemoryCatalog`: ``` test("desc table for parquet data source table using in-memory catalog") test("create a managed Hive source table") { test("create an external Hive source table") test("Create Hive Table As Select") ``` 2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed: ``` test("alter table: set location (datasource table)") test("alter table: set properties (datasource table)") test("alter table: unset properties (datasource table)") test("alter table: set serde (datasource table)") test("alter table: set serde partition (datasource table)") test("alter table: change column (datasource table)") test("alter table: add partition (datasource table)") test("alter table: drop partition (datasource table)") test("alter table: rename partition (datasource table)") test("drop table - data source table") ``` **TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`, `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`. ### How was this patch tested? N/A Author: Xiao LiAuthor: gatorsmile Closes #16592 from gatorsmile/refactorDDLSuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09829be6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09829be6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09829be6 Branch: refs/heads/master Commit: 09829be621f0f9bb5076abb3d832925624699fa9 Parents: d809cee Author: Xiao Li Authored: Wed Mar 8 23:12:10 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 8 23:12:10 2017 -0800 -- .../spark/sql/execution/command/DDLSuite.scala | 456 +++ .../apache/spark/sql/test/SQLTestUtils.scala| 5 + .../spark/sql/hive/execution/HiveDDLSuite.scala | 157 +++ 3 files changed, 345 insertions(+), 273 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/09829be6/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index c1f8b2b..aa335c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -30,23 +30,164 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { - private val escapedIdentifier = "`(.+)`".r +class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with BeforeAndAfterEach { override def afterEach(): Unit = { try { // drop all databases, tables and functions after each test spark.sessionState.catalog.reset() } finally { - Utils.deleteRecursively(new File("spark-warehouse")) + Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath)) super.afterEach() } } +
spark git commit: [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer.
Repository: spark Updated Branches: refs/heads/branch-2.1 3457c3229 -> 78cc5721f [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer. ## What changes were proposed in this pull request? In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery. I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it. ## How was this patch tested? Tested manually. Author: Dilip BiswalCloses #17214 from dilipbiswal/analyis_twice. (cherry picked from commit d809ceed9762d5bbb04170e45f38751713112dd8) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78cc5721 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78cc5721 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78cc5721 Branch: refs/heads/branch-2.1 Commit: 78cc5721f07af5c561e89d1bbc72975bb67abb74 Parents: 3457c32 Author: Dilip Biswal Authored: Wed Mar 8 17:33:49 2017 -0800 Committer: Xiao Li Committed: Wed Mar 8 17:34:05 2017 -0800 -- .../org/apache/spark/sql/execution/QueryExecution.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/78cc5721/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b3ef29f..9b53d21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -45,9 +45,14 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { protected def planner = sparkSession.sessionState.planner def assertAnalyzed(): Unit = { -try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch { +// Analyzer is invoked outside the try block to avoid calling it again from within the +// catch block below. +analyzed +try { + sparkSession.sessionState.analyzer.checkAnalysis(analyzed) +} catch { case e: AnalysisException => -val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed)) +val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed)) ae.setStackTrace(e.getStackTrace) throw ae } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer.
Repository: spark Updated Branches: refs/heads/master a3648b5d4 -> d809ceed9 [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer. ## What changes were proposed in this pull request? In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery. I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it. ## How was this patch tested? Tested manually. Author: Dilip BiswalCloses #17214 from dilipbiswal/analyis_twice. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d809ceed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d809ceed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d809ceed Branch: refs/heads/master Commit: d809ceed9762d5bbb04170e45f38751713112dd8 Parents: a3648b5 Author: Dilip Biswal Authored: Wed Mar 8 17:33:49 2017 -0800 Committer: Xiao Li Committed: Wed Mar 8 17:33:49 2017 -0800 -- .../org/apache/spark/sql/execution/QueryExecution.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d809ceed/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 6ec2f4d..9a3656d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -46,9 +46,14 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { protected def planner = sparkSession.sessionState.planner def assertAnalyzed(): Unit = { -try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch { +// Analyzer is invoked outside the try block to avoid calling it again from within the +// catch block below. +analyzed +try { + sparkSession.sessionState.analyzer.checkAnalysis(analyzed) +} catch { case e: AnalysisException => -val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed)) +val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed)) ae.setStackTrace(e.getStackTrace) throw ae } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations for branch-2.1"
Repository: spark Updated Branches: refs/heads/branch-2.1 f6c1ad2eb -> 3457c3229 Revert "[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations for branch-2.1" This reverts commit 502c927b8c8a99ef2adf4e6e1d7a6d9232d45ef5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3457c322 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3457c322 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3457c322 Branch: refs/heads/branch-2.1 Commit: 3457c32297e0150a4fbc80a30f84b9c62ca7c372 Parents: f6c1ad2 Author: Shixiong ZhuAuthored: Wed Mar 8 14:30:54 2017 -0800 Committer: Shixiong Zhu Committed: Wed Mar 8 14:41:29 2017 -0800 -- .../analysis/UnsupportedOperationChecker.scala | 11 +- .../sql/catalyst/plans/logical/object.scala | 49 --- .../analysis/UnsupportedOperationsSuite.scala | 24 +- .../FlatMapGroupsWithStateFunction.java | 38 --- .../function/MapGroupsWithStateFunction.java| 38 --- .../spark/sql/KeyValueGroupedDataset.scala | 113 --- .../scala/org/apache/spark/sql/KeyedState.scala | 142 .../spark/sql/execution/SparkStrategies.scala | 21 +- .../apache/spark/sql/execution/objects.scala| 22 -- .../streaming/IncrementalExecution.scala| 19 +- .../execution/streaming/KeyedStateImpl.scala| 80 - .../execution/streaming/ProgressReporter.scala | 2 +- .../execution/streaming/StatefulAggregate.scala | 237 + .../state/HDFSBackedStateStoreProvider.scala| 19 -- .../execution/streaming/state/StateStore.scala | 5 - .../sql/execution/streaming/state/package.scala | 11 +- .../execution/streaming/statefulOperators.scala | 323 -- .../org/apache/spark/sql/JavaDatasetSuite.java | 32 -- .../sql/streaming/MapGroupsWithStateSuite.scala | 335 --- 19 files changed, 249 insertions(+), 1272 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3457c322/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index d8aad42..f4d016c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -46,13 +46,8 @@ object UnsupportedOperationChecker { "Queries without streaming sources cannot be executed with writeStream.start()")(plan) } -/** Collect all the streaming aggregates in a sub plan */ -def collectStreamingAggregates(subplan: LogicalPlan): Seq[Aggregate] = { - subplan.collect { case a: Aggregate if a.isStreaming => a } -} - // Disallow multiple streaming aggregations -val aggregates = collectStreamingAggregates(plan) +val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } if (aggregates.size > 1) { throwError( @@ -119,10 +114,6 @@ object UnsupportedOperationChecker { case _: InsertIntoTable => throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") -case m: MapGroupsWithState if collectStreamingAggregates(m).nonEmpty => - throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " + -"streaming DataFrame/Dataset") - case Join(left, right, joinType, _) => joinType match { http://git-wip-us.apache.org/repos/asf/spark/blob/3457c322/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 0be4823..0ab4c90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -313,55 +313,6 @@ case class MapGroups( outputObjAttr: Attribute, child: LogicalPlan) extends UnaryNode with ObjectProducer -/** Internal class representing State */ -trait LogicalKeyedState[S] - -/** Factory for constructing new `MapGroupsWithState` nodes. */ -object MapGroupsWithState { - def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder]( - func: (Any, Iterator[Any], LogicalKeyedState[Any])
spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
Repository: spark Updated Branches: refs/heads/master 455129020 -> a3648b5d4 [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak YavuzCloses #17153 from brkyvz/maxFileAge. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3648b5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3648b5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3648b5d Branch: refs/heads/master Commit: a3648b5d4f99ff9461d02f53e9ec71787a3abf51 Parents: 4551290 Author: Burak Yavuz Authored: Wed Mar 8 14:35:07 2017 -0800 Committer: Burak Yavuz Committed: Wed Mar 8 14:35:07 2017 -0800 -- .../execution/streaming/FileStreamOptions.scala | 5 +- .../execution/streaming/FileStreamSource.scala | 14 +++- .../sql/streaming/FileStreamSourceSuite.scala | 82 3 files changed, 63 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 2f802d7..e7ba901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging } /** - * Maximum age of a file that can be found in this directory, before it is deleted. + * Maximum age of a file that can be found in this directory, before it is ignored. For the + * first batch all files will be considered valid. If `latestFirst` is set to `true` and + * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are + * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 6a7263c..0f09b0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -66,23 +66,29 @@ class FileStreamSource( private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( -"""'latestFirst' is true. New files will be processed first. - |It may affect the watermark value""".stripMargin) +"""'latestFirst' is true. New files will be processed first, which may affect the watermark + |value. In addition,
spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
Repository: spark Updated Branches: refs/heads/branch-2.1 320eff14b -> f6c1ad2eb [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak YavuzCloses #17153 from brkyvz/maxFileAge. (cherry picked from commit a3648b5d4f99ff9461d02f53e9ec71787a3abf51) Signed-off-by: Burak Yavuz Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6c1ad2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6c1ad2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6c1ad2e Branch: refs/heads/branch-2.1 Commit: f6c1ad2eb6d0706899aabbdd39e558b3488e2ef3 Parents: 320eff1 Author: Burak Yavuz Authored: Wed Mar 8 14:35:07 2017 -0800 Committer: Burak Yavuz Committed: Wed Mar 8 14:35:22 2017 -0800 -- .../execution/streaming/FileStreamOptions.scala | 5 +- .../execution/streaming/FileStreamSource.scala | 14 +++- .../sql/streaming/FileStreamSourceSuite.scala | 82 3 files changed, 63 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 25ebe17..fe64838 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging { } /** - * Maximum age of a file that can be found in this directory, before it is deleted. + * Maximum age of a file that can be found in this directory, before it is ignored. For the + * first batch all files will be considered valid. If `latestFirst` is set to `true` and + * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are + * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 39c0b49..0f0b6f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -64,23 +64,29 @@ class FileStreamSource( private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( -"""'latestFirst' is true. New files will be processed first. - |It may affect the watermark value""".stripMargin) +
spark git commit: [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV
Repository: spark Updated Branches: refs/heads/master 6570cfd7a -> 455129020 [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV ## What changes were proposed in this pull request? This PR proposes to add an API that loads `DataFrame` from `Dataset[String]` storing csv. It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below: - Case 1 - pre-processing ```scala val df = spark.read.text("...") // Pre-processing with this. spark.read.csv(df.as[String]) ``` - Case 2 - use other input formats ```scala val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength)) spark.read.csv(stringRdd.toDS) ``` ## How was this patch tested? Added tests in `CSVSuite` and build with Scala 2.10. ``` ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package ``` Author: hyukjinkwonCloses #16854 from HyukjinKwon/SPARK-15463. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45512902 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45512902 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45512902 Branch: refs/heads/master Commit: 455129020ca7f6a162f6f2486a87cc43512cfd2c Parents: 6570cfd Author: hyukjinkwon Authored: Wed Mar 8 13:43:09 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 8 13:43:09 2017 -0800 -- .../org/apache/spark/sql/DataFrameReader.scala | 71 +--- .../datasources/csv/CSVDataSource.scala | 49 -- .../execution/datasources/csv/CSVOptions.scala | 2 +- .../datasources/csv/UnivocityParser.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala| 27 5 files changed, 121 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45512902/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 41470ae..a5e38e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.JsonInferSchema @@ -368,14 +369,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { createParser) } -// Check a field requirement for corrupt records here to throw an exception in a driver side -schema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex => - val f = schema(corruptFieldIndex) - if (f.dataType != StringType || !f.nullable) { -throw new AnalysisException( - "The field for corrupt records must be string type and nullable") - } -} +verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord) val parsed = jsonDataset.rdd.mapPartitions { iter => val parser = new JacksonParser(schema, parsedOptions) @@ -399,6 +393,51 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** + * Loads an `Dataset[String]` storing CSV rows and returns the result as a `DataFrame`. + * + * If the schema is not specified using `schema` function and `inferSchema` option is enabled, + * this function goes through the input once to determine the input schema. + * + * If the schema is not specified using `schema` function and `inferSchema` option is disabled, + * it determines the columns as string types and it reads only the first line to determine the + * names and the number of fields. + * + * @param csvDataset input Dataset with one CSV row per record + * @since 2.2.0 + */ + def csv(csvDataset: Dataset[String]): DataFrame = { +val parsedOptions: CSVOptions = new CSVOptions( + extraOptions.toMap, +
spark git commit: [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow invalid cases
Repository: spark Updated Branches: refs/heads/master e9e2c612d -> 1bf901238 [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow invalid cases ## What changes were proposed in this pull request? Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`. `UnsupportedOperationChecker` is modified to disallow unsupported cases. - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed. - For streaming (map/flatMap)GroupsWithState, see the following table: | Operators | Supported Query Output Mode | | - | - | | flatMapGroupsWithState(Update) without aggregation | Update | | flatMapGroupsWithState(Update) with aggregation | None | | flatMapGroupsWithState(Append) without aggregation | Append | | flatMapGroupsWithState(Append) before aggregation | Append, Update, Complete | | flatMapGroupsWithState(Append) after aggregation | None | | Multiple flatMapGroupsWithState(Append)s | Append | | Multiple mapGroupsWithStates | None | | Mxing mapGroupsWithStates and flatMapGroupsWithStates | None | | Other cases of multiple flatMapGroupsWithState | None | ## How was this patch tested? The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState: ``` [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple
spark git commit: [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState
Repository: spark Updated Branches: refs/heads/master 1bf901238 -> 6570cfd7a [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState. Subsequent changes to base session are not propagated to cloned session, clone is independent after creation. If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables. Unit tests Author: Kunal KhamarAuthor: Shixiong Zhu Closes #16826 from kunalkhamar/fork-sparksession. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6570cfd7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6570cfd7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6570cfd7 Branch: refs/heads/master Commit: 6570cfd7abe349dc6d2151f2ac9dc662e7465a79 Parents: 1bf9012 Author: Kunal Khamar Authored: Wed Mar 8 13:06:22 2017 -0800 Committer: Shixiong Zhu Committed: Wed Mar 8 13:20:45 2017 -0800 -- .../spark/sql/catalyst/CatalystConf.scala | 7 +- .../catalyst/analysis/FunctionRegistry.scala| 5 +- .../sql/catalyst/catalog/SessionCatalog.scala | 38 ++- .../catalyst/catalog/SessionCatalogSuite.scala | 55 .../apache/spark/sql/ExperimentalMethods.scala | 6 + .../org/apache/spark/sql/SparkSession.scala | 59 - .../spark/sql/execution/datasources/rules.scala | 3 +- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../spark/sql/internal/SessionState.scala | 235 +++-- .../apache/spark/sql/SessionStateSuite.scala| 162 .../spark/sql/internal/CatalogSuite.scala | 21 +- .../spark/sql/internal/SQLConfEntrySuite.scala | 18 ++ .../apache/spark/sql/test/TestSQLContext.scala | 20 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 5 +- .../spark/sql/hive/HiveSessionCatalog.scala | 92 +-- .../spark/sql/hive/HiveSessionState.scala | 261 ++- .../spark/sql/hive/client/HiveClientImpl.scala | 2 + .../apache/spark/sql/hive/test/TestHive.scala | 67 ++--- .../sql/hive/HiveSessionCatalogSuite.scala | 112 .../spark/sql/hive/HiveSessionStateSuite.scala | 41 +++ 20 files changed, 981 insertions(+), 236 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6570cfd7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index fb99cb2..cff0efa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -66,6 +66,8 @@ trait CatalystConf { /** The maximum number of joined nodes allowed in the dynamic programming algorithm. */ def joinReorderDPThreshold: Int + + override def clone(): CatalystConf = throw new CloneNotSupportedException() } @@ -85,4 +87,7 @@ case class SimpleCatalystConf( joinReorderDPThreshold: Int = 12, warehousePath: String = "/user/hive/warehouse", sessionLocalTimeZone: String = TimeZone.getDefault().getID) - extends CatalystConf + extends CatalystConf { + + override def clone(): SimpleCatalystConf = this.copy() +} http://git-wip-us.apache.org/repos/asf/spark/blob/6570cfd7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 556fa99..0dcb440 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -64,6 +64,8 @@ trait FunctionRegistry { /** Clear all registered functions. */ def clear(): Unit + /** Create a copy of this registry with identical functions as this registry. */ + override def clone(): FunctionRegistry = throw new CloneNotSupportedException() } class SimpleFunctionRegistry extends
spark git commit: [SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt
Repository: spark Updated Branches: refs/heads/branch-2.0 da3dfafa9 -> c561e6cfa [SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt ## What changes were proposed in this pull request? `Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable. This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #16825 from zsxwing/SPARK-19481. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c561e6cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c561e6cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c561e6cf Branch: refs/heads/branch-2.0 Commit: c561e6cfaf8e67a58fa79a1d7284b779fee4e79f Parents: da3dfaf Author: Shixiong Zhu Authored: Thu Feb 9 11:16:51 2017 -0800 Committer: Shixiong Zhu Committed: Wed Mar 8 12:49:53 2017 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 7 +++ .../main/scala/org/apache/spark/repl/Main.scala | 1 + .../org/apache/spark/repl/SparkILoop.scala | 1 - .../main/scala/org/apache/spark/repl/Main.scala | 2 +- .../scala/org/apache/spark/repl/Signaling.scala | 20 +++- 5 files changed, 20 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2abe444..daef497 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2295,6 +2295,13 @@ object SparkContext extends Logging { getOrCreate(new SparkConf()) } + /** Return the current active [[SparkContext]] if any. */ + private[spark] def getActive: Option[SparkContext] = { +SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + Option(activeContext.get()) +} + } + /** * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is * running. Throws an exception if a running context is detected and logs a warning if another http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala -- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala index 7b4e14b..fba321b 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala @@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging object Main extends Logging { initializeLogIfNecessary(true) + Signaling.cancelOnInterrupt() private var _interp: SparkILoop = _ http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala -- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index e017aa4..b7237a6 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1027,7 +1027,6 @@ class SparkILoop( builder.getOrCreate() } sparkContext = sparkSession.sparkContext -Signaling.cancelOnInterrupt(sparkContext) sparkSession } http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 5dfe18a..13b772b 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils object Main extends Logging { initializeLogIfNecessary(true) + Signaling.cancelOnInterrupt() val conf = new SparkConf() val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) @@ -108,7 +109,6 @@ object Main extends Logging { logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext -
spark git commit: [SPARK-19727][SQL] Fix for round function that modifies original column
Repository: spark Updated Branches: refs/heads/master f3387d974 -> e9e2c612d [SPARK-19727][SQL] Fix for round function that modifies original column ## What changes were proposed in this pull request? Fix for SQL round function that modifies original column when underlying data frame is created from a local product. import org.apache.spark.sql.functions._ case class NumericRow(value: BigDecimal) val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789" df.show() ++ | value| ++ |1.2345678900| ++ df.withColumn("value_rounded", round('value)).show() // before ++-+ | value|value_rounded| ++-+ |1.00|1| ++-+ // after ++-+ | value|value_rounded| ++-+ |1.2345678900|1| ++-+ ## How was this patch tested? New unit test added to existing suite `org.apache.spark.sql.MathFunctionsSuite` Author: Wojtek SzymanskiCloses #17075 from wojtek-szymanski/SPARK-19727. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9e2c612 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9e2c612 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9e2c612 Branch: refs/heads/master Commit: e9e2c612d58a19ddcb4b6abfb7389a4b0f7ef6f8 Parents: f3387d9 Author: Wojtek Szymanski Authored: Wed Mar 8 12:36:16 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 8 12:36:16 2017 -0800 -- .../sql/catalyst/CatalystTypeConverters.scala | 6 + .../spark/sql/catalyst/expressions/Cast.scala | 13 +++-- .../expressions/decimalExpressions.scala| 10 ++- .../catalyst/expressions/mathExpressions.scala | 2 +- .../org/apache/spark/sql/types/Decimal.scala| 28 ++-- .../apache/spark/sql/types/DecimalSuite.scala | 8 +- .../apache/spark/sql/MathFunctionsSuite.scala | 12 + 7 files changed, 54 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e9e2c612/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 5b91615..d4ebdb1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -310,11 +310,7 @@ object CatalystTypeConverters { case d: JavaBigInteger => Decimal(d) case d: Decimal => d } - if (decimal.changePrecision(dataType.precision, dataType.scale)) { -decimal - } else { -null - } + decimal.toPrecision(dataType.precision, dataType.scale).orNull } override def toScala(catalystValue: Decimal): JavaBigDecimal = { if (catalystValue == null) null http://git-wip-us.apache.org/repos/asf/spark/blob/e9e2c612/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 7c60f7d..1049915 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -352,6 +352,15 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String if (value.changePrecision(decimalType.precision, decimalType.scale)) value else null } + /** + * Create new `Decimal` with precision and scale given in `decimalType` (if any), + * returning null if it overflows or creating a new `value` and returning it if successful. + * + */ + private[this] def toPrecision(value: Decimal, decimalType: DecimalType): Decimal = +value.toPrecision(decimalType.precision, decimalType.scale).orNull + + private[this] def castToDecimal(from: DataType, target: DecimalType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try { @@ -360,14 +369,14 @@ case class Cast(child: Expression,
spark git commit: [SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper`
Repository: spark Updated Branches: refs/heads/master 9a6ac7226 -> e420fd459 [SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper` ## What changes were proposed in this pull request? This is as per suggestion by rxin at : https://github.com/apache/spark/pull/17184#discussion_r104841735 ## How was this patch tested? NA as this is a documentation change Author: Tejas PatilCloses #17205 from tejasapatil/SPARK-19843_followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e420fd45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e420fd45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e420fd45 Branch: refs/heads/master Commit: e420fd4592615d91cdcbca674ac58bcca6ab2ff3 Parents: 9a6ac72 Author: Tejas Patil Authored: Wed Mar 8 09:38:05 2017 -0800 Committer: Reynold Xin Committed: Wed Mar 8 09:38:05 2017 -0800 -- .../apache/spark/unsafe/types/UTF8String.java| 19 +++ 1 file changed, 15 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e420fd45/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 7abe0fa..4c28075 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -850,11 +850,26 @@ public final class UTF8String implements Comparable, Externalizable, return fromString(sb.toString()); } + /** + * Wrapper over `long` to allow result of parsing long from string to be accessed via reference. + * This is done solely for better performance and is not expected to be used by end users. + */ public static class LongWrapper { public long value = 0; } /** + * Wrapper over `int` to allow result of parsing integer from string to be accessed via reference. + * This is done solely for better performance and is not expected to be used by end users. + * + * {@link LongWrapper} could have been used here but using `int` directly save the extra cost of + * conversion from `long` -> `int` + */ + public static class IntWrapper { +public int value = 0; + } + + /** * Parses this UTF8String to long. * * Note that, in this method we accumulate the result in negative format, and convert it to @@ -942,10 +957,6 @@ public final class UTF8String implements Comparable, Externalizable, return true; } - public static class IntWrapper { -public int value = 0; - } - /** * Parses this UTF8String to int. * - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled Repartition
Repository: spark Updated Branches: refs/heads/master 5f7d835d3 -> 9a6ac7226 [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled Repartition ### What changes were proposed in this pull request? Observed by felixcheung in https://github.com/apache/spark/pull/16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later. Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result. ```Scala val df = spark.range(0, 1, 1, 5) val df2 = df.repartition(10) assert(df2.coalesce(13).rdd.getNumPartitions == 5) assert(df2.coalesce(7).rdd.getNumPartitions == 5) assert(df2.coalesce(3).rdd.getNumPartitions == 3) ``` This PR is to fix the issue. We preserve shuffle-enabled Repartition. ### How was this patch tested? Added a test case Author: Xiao LiCloses #16933 from gatorsmile/CollapseRepartition. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a6ac722 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a6ac722 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a6ac722 Branch: refs/heads/master Commit: 9a6ac7226fd09d570cae08d0daea82d9bca189a0 Parents: 5f7d835 Author: Xiao Li Authored: Wed Mar 8 09:36:01 2017 -0800 Committer: Xiao Li Committed: Wed Mar 8 09:36:01 2017 -0800 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 +- .../apache/spark/sql/catalyst/dsl/package.scala | 3 + .../sql/catalyst/optimizer/Optimizer.scala | 32 ++-- .../plans/logical/basicLogicalOperators.scala | 16 +- .../optimizer/CollapseRepartitionSuite.scala| 153 +-- .../scala/org/apache/spark/sql/Dataset.scala| 10 +- .../spark/sql/execution/PlannerSuite.scala | 9 +- 7 files changed, 178 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9a6ac722/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 620b633..9735fe3 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2592,8 +2592,8 @@ test_that("coalesce, repartition, numPartitions", { df2 <- repartition(df1, 10) expect_equal(getNumPartitions(df2), 10) - expect_equal(getNumPartitions(coalesce(df2, 13)), 5) - expect_equal(getNumPartitions(coalesce(df2, 7)), 5) + expect_equal(getNumPartitions(coalesce(df2, 13)), 10) + expect_equal(getNumPartitions(coalesce(df2, 7)), 7) expect_equal(getNumPartitions(coalesce(df2, 3)), 3) }) http://git-wip-us.apache.org/repos/asf/spark/blob/9a6ac722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 0f0d904..35ca2a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -370,6 +370,9 @@ package object dsl { def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan) + def coalesce(num: Integer): LogicalPlan = +Repartition(num, shuffle = false, logicalPlan) + def repartition(num: Integer): LogicalPlan = Repartition(num, shuffle = true, logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/9a6ac722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d5bbc6e..caafa1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -564,27 +564,23 @@ object CollapseProject extends Rule[LogicalPlan] { } /** - * Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator combinations - * by keeping only the one. - * 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]]. - * 2. For adjacent [[RepartitionByExpression]]s, collapse into the last [[RepartitionByExpression]]. - * 3. For a combination of
spark git commit: [SPARK-19865][SQL] remove the view identifier in SubqueryAlias
Repository: spark Updated Branches: refs/heads/master e44274870 -> 5f7d835d3 [SPARK-19865][SQL] remove the view identifier in SubqueryAlias ## What changes were proposed in this pull request? Since we have a `View` node now, we can remove the view identifier in `SubqueryAlias`, which was used to indicate a view node before. ## How was this patch tested? Update the related test cases. Author: jiangxingboCloses #17210 from jiangxb1987/SubqueryAlias. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f7d835d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f7d835d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f7d835d Branch: refs/heads/master Commit: 5f7d835d380c1a558a4a6d8366140cd96ee202eb Parents: e442748 Author: jiangxingbo Authored: Wed Mar 8 16:18:17 2017 +0100 Committer: Herman van Hovell Committed: Wed Mar 8 16:18:17 2017 +0100 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 8 .../org/apache/spark/sql/catalyst/dsl/package.scala | 4 ++-- .../spark/sql/catalyst/optimizer/subquery.scala | 8 .../spark/sql/catalyst/parser/AstBuilder.scala | 6 +++--- .../plans/logical/basicLogicalOperators.scala | 3 +-- .../spark/sql/catalyst/analysis/AnalysisSuite.scala | 16 .../sql/catalyst/catalog/SessionCatalogSuite.scala | 6 +++--- .../sql/catalyst/optimizer/ColumnPruningSuite.scala | 8 .../optimizer/EliminateSubqueryAliasesSuite.scala | 6 +++--- .../catalyst/optimizer/JoinOptimizationSuite.scala | 8 .../spark/sql/catalyst/parser/PlanParserSuite.scala | 2 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../sql/execution/joins/BroadcastJoinSuite.scala| 3 --- .../spark/sql/hive/HiveMetastoreCatalogSuite.scala | 2 +- .../spark/sql/hive/execution/SQLQuerySuite.scala| 2 +- 16 files changed, 42 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f7d835d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ffa5aed..93666f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -598,7 +598,7 @@ class Analyzer( execute(child) } view.copy(child = newChild) - case p @ SubqueryAlias(_, view: View, _) => + case p @ SubqueryAlias(_, view: View) => val newChild = resolveRelation(view) p.copy(child = newChild) case _ => plan @@ -2363,7 +2363,7 @@ class Analyzer( */ object EliminateSubqueryAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case SubqueryAlias(_, child, _) => child +case SubqueryAlias(_, child) => child } } http://git-wip-us.apache.org/repos/asf/spark/blob/5f7d835d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 498bfbd..831e37a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -578,7 +578,7 @@ class SessionCatalog( val table = formatTableName(name.table) if (db == globalTempViewManager.database) { globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(table, viewDef, None) + SubqueryAlias(table, viewDef) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) @@ -591,17 +591,17 @@ class SessionCatalog( desc = metadata, output = metadata.schema.toAttributes, child = parser.parsePlan(viewText)) - SubqueryAlias(table, child, Some(name.copy(table = table, database = Some(db + SubqueryAlias(table, child) } else { val tableRelation = CatalogRelation( metadata, // we assume all the columns are
spark git commit: [SPARK-17080][SQL] join reorder
Repository: spark Updated Branches: refs/heads/master 9ea201cf6 -> e44274870 [SPARK-17080][SQL] join reorder ## What changes were proposed in this pull request? Reorder the joins using a dynamic programming algorithm (Selinger paper): First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them. When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows: ``` level 1: p({A}), p({B}), p({C}), p({D}) level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D}) level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D}) level 4: p({A, B, C, D}) ``` where p({A, B, C, D}) is the final output plan. For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs. ## How was this patch tested? add test cases Author: wangzhenhuaAuthor: Zhenhua Wang Closes #17138 from wzhfy/joinReorder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4427487 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4427487 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4427487 Branch: refs/heads/master Commit: e44274870dee308f4e3e8ce79457d8d19693b6e5 Parents: 9ea201c Author: wangzhenhua Authored: Wed Mar 8 16:01:28 2017 +0100 Committer: Herman van Hovell Committed: Wed Mar 8 16:01:28 2017 +0100 -- .../spark/sql/catalyst/CatalystConf.scala | 8 + .../optimizer/CostBasedJoinReorder.scala| 297 +++ .../sql/catalyst/optimizer/Optimizer.scala | 2 + .../catalyst/optimizer/JoinReorderSuite.scala | 194 .../spark/sql/catalyst/plans/PlanTest.scala | 2 +- .../StatsEstimationTestBase.scala | 4 +- .../org/apache/spark/sql/internal/SQLConf.scala | 16 + .../sql/execution/SparkSqlParserSuite.scala | 2 +- 8 files changed, 521 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e4427487/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 5f50ce1..fb99cb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -60,6 +60,12 @@ trait CatalystConf { * Enables CBO for estimation of plan statistics when set true. */ def cboEnabled: Boolean + + /** Enables join reorder in CBO. */ + def joinReorderEnabled: Boolean + + /** The maximum number of joined nodes allowed in the dynamic programming algorithm. */ + def joinReorderDPThreshold: Int } @@ -75,6 +81,8 @@ case class SimpleCatalystConf( runSQLonFile: Boolean = true, crossJoinEnabled: Boolean = false, cboEnabled: Boolean = false, +joinReorderEnabled: Boolean = false, +joinReorderDPThreshold: Int = 12, warehousePath: String = "/user/hive/warehouse", sessionLocalTimeZone: String = TimeZone.getDefault().getID) extends CatalystConf http://git-wip-us.apache.org/repos/asf/spark/blob/e4427487/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala new file mode 100644 index 000..b694561 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You
spark git commit: [SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in case of exception
Repository: spark Updated Branches: refs/heads/master 3f9f9180c -> 9ea201cf6 [SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in case of exception ## What changes were proposed in this pull request? Ensure broadcasted variable are destroyed even in case of exception ## How was this patch tested? Word2VecSuite was run locally Author: Anthony TruchetCloses #14299 from AnthonyTruchet/SPARK-16440. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ea201cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ea201cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ea201cf Branch: refs/heads/master Commit: 9ea201cf6482c9c62c9428759d238063db62d66e Parents: 3f9f918 Author: Anthony Truchet Authored: Wed Mar 8 11:44:25 2017 + Committer: Sean Owen Committed: Wed Mar 8 11:44:25 2017 + -- .../org/apache/spark/mllib/feature/Word2Vec.scala | 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9ea201cf/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 2364d43..531c8b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} @@ -314,6 +315,20 @@ class Word2Vec extends Serializable with Logging { val expTable = sc.broadcast(createExpTable()) val bcVocab = sc.broadcast(vocab) val bcVocabHash = sc.broadcast(vocabHash) +try { + doFit(dataset, sc, expTable, bcVocab, bcVocabHash) +} finally { + expTable.destroy(blocking = false) + bcVocab.destroy(blocking = false) + bcVocabHash.destroy(blocking = false) +} + } + + private def doFit[S <: Iterable[String]]( +dataset: RDD[S], sc: SparkContext, +expTable: Broadcast[Array[Float]], +bcVocab: Broadcast[Array[VocabWord]], +bcVocabHash: Broadcast[mutable.HashMap[String, Int]]) = { // each partition is a collection of sentences, // will be translated into arrays of Index integer val sentences: RDD[Array[Int]] = dataset.mapPartitions { sentenceIter => @@ -435,9 +450,6 @@ class Word2Vec extends Serializable with Logging { bcSyn1Global.destroy(false) } newSentences.unpersist() -expTable.destroy(false) -bcVocab.destroy(false) -bcVocabHash.destroy(false) val wordArray = vocab.map(_.word) new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions
Repository: spark Updated Branches: refs/heads/master 81303f7ca -> 3f9f9180c [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions ## What changes were proposed in this pull request? Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`. ## How was this patch tested? unit tests Author: Yuming WangCloses #17020 from wangyum/SPARK-19693. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f9f9180 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f9f9180 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f9f9180 Branch: refs/heads/master Commit: 3f9f9180c2e695ad468eb813df5feec41e169531 Parents: 81303f7 Author: Yuming Wang Authored: Wed Mar 8 11:31:01 2017 + Committer: Sean Owen Committed: Wed Mar 8 11:31:01 2017 + -- .../spark/sql/execution/command/SetCommand.scala | 17 + .../org/apache/spark/sql/internal/SQLConf.scala| 4 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 3 files changed, 33 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f9f9180/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 7afa4e7..5f12830 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -60,6 +60,23 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm } (keyValueOutput, runFunc) +case Some((SQLConf.Replaced.MAPREDUCE_JOB_REDUCES, Some(value))) => + val runFunc = (sparkSession: SparkSession) => { +logWarning( + s"Property ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} is Hadoop's property, " + +s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") +if (value.toInt < 1) { + val msg = +s"Setting negative ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} for automatically " + + "determining the number of reducers is not supported." + throw new IllegalArgumentException(msg) +} else { + sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value) + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) +} + } + (keyValueOutput, runFunc) + case Some((key @ SetCommand.VariableName(name), Some(value))) => val runFunc = (sparkSession: SparkSession) => { sparkSession.conf.set(name, value) http://git-wip-us.apache.org/repos/asf/spark/blob/3f9f9180/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 461dfe3..fd3acd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -677,6 +677,10 @@ object SQLConf { object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } + + object Replaced { +val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces" + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/3f9f9180/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 468ea05..d9e0196 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1019,6 +1019,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { spark.sessionState.conf.clear() } + test("SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions") { +spark.sessionState.conf.clear() +val before = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt +val newConf = before + 1 +sql(s"SET mapreduce.job.reduces=${newConf.toString}") +val after = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt +assert(before != after) +assert(newConf === after) +intercept[IllegalArgumentException](sql(s"SET
spark git commit: [SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports tweedie distribution.
Repository: spark Updated Branches: refs/heads/master 1fa58868b -> 81303f7ca [SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports tweedie distribution. ## What changes were proposed in this pull request? PySpark ```GeneralizedLinearRegression``` supports tweedie distribution. ## How was this patch tested? Add unit tests. Author: Yanbo LiangCloses #17146 from yanboliang/spark-19806. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81303f7c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81303f7c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81303f7c Branch: refs/heads/master Commit: 81303f7ca7808d51229411dce8feeed8c23dbe15 Parents: 1fa5886 Author: Yanbo Liang Authored: Wed Mar 8 02:09:36 2017 -0800 Committer: Yanbo Liang Committed: Wed Mar 8 02:09:36 2017 -0800 -- .../GeneralizedLinearRegression.scala | 8 +-- python/pyspark/ml/regression.py | 61 +--- python/pyspark/ml/tests.py | 20 +++ 3 files changed, 77 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81303f7c/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 110764d..3be8b53 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -66,7 +66,7 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam /** * Param for the power in the variance function of the Tweedie distribution which provides * the relationship between the variance and mean of the distribution. - * Only applicable for the Tweedie family. + * Only applicable to the Tweedie family. * (see https://en.wikipedia.org/wiki/Tweedie_distribution;> * Tweedie Distribution (Wikipedia)) * Supported values: 0 and [1, Inf). @@ -79,7 +79,7 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam final val variancePower: DoubleParam = new DoubleParam(this, "variancePower", "The power in the variance function of the Tweedie distribution which characterizes " + "the relationship between the variance and mean of the distribution. " + -"Only applicable for the Tweedie family. Supported values: 0 and [1, Inf).", +"Only applicable to the Tweedie family. Supported values: 0 and [1, Inf).", (x: Double) => x >= 1.0 || x == 0.0) /** @group getParam */ @@ -106,7 +106,7 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam def getLink: String = $(link) /** - * Param for the index in the power link function. Only applicable for the Tweedie family. + * Param for the index in the power link function. Only applicable to the Tweedie family. * Note that link power 0, 1, -1 or 0.5 corresponds to the Log, Identity, Inverse or Sqrt * link, respectively. * When not set, this value defaults to 1 - [[variancePower]], which matches the R "statmod" @@ -116,7 +116,7 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam */ @Since("2.2.0") final val linkPower: DoubleParam = new DoubleParam(this, "linkPower", -"The index in the power link function. Only applicable for the Tweedie family.") +"The index in the power link function. Only applicable to the Tweedie family.") /** @group getParam */ @Since("2.2.0") http://git-wip-us.apache.org/repos/asf/spark/blob/81303f7c/python/pyspark/ml/regression.py -- diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index b199bf2..3c3fcc8 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -1294,8 +1294,8 @@ class GeneralizedLinearRegression(JavaEstimator, HasLabelCol, HasFeaturesCol, Ha Fit a Generalized Linear Model specified by giving a symbolic description of the linear predictor (link function) and a description of the error distribution (family). It supports -"gaussian", "binomial", "poisson" and "gamma" as family. Valid link functions for each family -is listed below. The first link function of each family is the default one. +"gaussian", "binomial", "poisson", "gamma" and "tweedie" as family. Valid link functions for +each family is listed below.
spark git commit: [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder
Repository: spark Updated Branches: refs/heads/branch-2.0 e69902806 -> da3dfafa9 [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder Previously, we were using the mirror of passed in `TypeTag` when reflecting to build an encoder. This fails when the outer class is built in (i.e. `Seq`'s default mirror is based on root classloader) but inner classes (i.e. `A` in `Seq[A]`) are defined in the REPL or a library. This patch changes us to always reflect based on a mirror created using the context classloader. Author: Michael ArmbrustCloses #17201 from marmbrus/replSeqEncoder. (cherry picked from commit 314e48a3584bad4b486b046bbf0159d64ba857bc) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da3dfafa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da3dfafa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da3dfafa Branch: refs/heads/branch-2.0 Commit: da3dfafa9725d7ff60831e11c4f77d21a0ae2204 Parents: e699028 Author: Michael Armbrust Authored: Wed Mar 8 01:32:42 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 8 01:34:25 2017 -0800 -- .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++ .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/da3dfafa/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index f7d7a4f..ad060dd 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -473,4 +473,15 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("AssertionError", output) assertDoesNotContain("Exception", output) } + + test("newProductSeqEncoder with REPL defined class") { +val output = runInterpreterInPasteMode("local-cluster[1,4,4096]", + """ + |case class Click(id: Int) + |spark.implicits.newProductSeqEncoder[Click] +""".stripMargin) + +assertDoesNotContain("error:", output) +assertDoesNotContain("Exception", output) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/da3dfafa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 1fac26c..eb4ca8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -45,8 +45,8 @@ import org.apache.spark.util.Utils object ExpressionEncoder { def apply[T : TypeTag](): ExpressionEncoder[T] = { // We convert the not-serializable TypeTag into StructType and ClassTag. -val mirror = typeTag[T].mirror -val tpe = typeTag[T].tpe +val mirror = ScalaReflection.mirror +val tpe = typeTag[T].in(mirror).tpe val cls = mirror.runtimeClass(tpe) val flat = !ScalaReflection.definedByConstructorParams(tpe) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder
Repository: spark Updated Branches: refs/heads/branch-2.1 0ba9ecbea -> 320eff14b [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder Previously, we were using the mirror of passed in `TypeTag` when reflecting to build an encoder. This fails when the outer class is built in (i.e. `Seq`'s default mirror is based on root classloader) but inner classes (i.e. `A` in `Seq[A]`) are defined in the REPL or a library. This patch changes us to always reflect based on a mirror created using the context classloader. Author: Michael ArmbrustCloses #17201 from marmbrus/replSeqEncoder. (cherry picked from commit 314e48a3584bad4b486b046bbf0159d64ba857bc) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/320eff14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/320eff14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/320eff14 Branch: refs/heads/branch-2.1 Commit: 320eff14b0bb634eba2cdcae2303ba38fd0eb282 Parents: 0ba9ecb Author: Michael Armbrust Authored: Wed Mar 8 01:32:42 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 8 01:32:51 2017 -0800 -- .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++ .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/320eff14/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 9262e93..5ef3987 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -473,4 +473,15 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("AssertionError", output) assertDoesNotContain("Exception", output) } + + test("newProductSeqEncoder with REPL defined class") { +val output = runInterpreterInPasteMode("local-cluster[1,4,4096]", + """ + |case class Click(id: Int) + |spark.implicits.newProductSeqEncoder[Click] +""".stripMargin) + +assertDoesNotContain("error:", output) +assertDoesNotContain("Exception", output) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/320eff14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 9c4818d..f7999a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -45,8 +45,8 @@ import org.apache.spark.util.Utils object ExpressionEncoder { def apply[T : TypeTag](): ExpressionEncoder[T] = { // We convert the not-serializable TypeTag into StructType and ClassTag. -val mirror = typeTag[T].mirror -val tpe = typeTag[T].tpe +val mirror = ScalaReflection.mirror +val tpe = typeTag[T].in(mirror).tpe if (ScalaReflection.optionOfProductType(tpe)) { throw new UnsupportedOperationException( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder
Repository: spark Updated Branches: refs/heads/master 56e1bd337 -> 314e48a35 [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder Previously, we were using the mirror of passed in `TypeTag` when reflecting to build an encoder. This fails when the outer class is built in (i.e. `Seq`'s default mirror is based on root classloader) but inner classes (i.e. `A` in `Seq[A]`) are defined in the REPL or a library. This patch changes us to always reflect based on a mirror created using the context classloader. Author: Michael ArmbrustCloses #17201 from marmbrus/replSeqEncoder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/314e48a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/314e48a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/314e48a3 Branch: refs/heads/master Commit: 314e48a3584bad4b486b046bbf0159d64ba857bc Parents: 56e1bd3 Author: Michael Armbrust Authored: Wed Mar 8 01:32:42 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 8 01:32:42 2017 -0800 -- .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++ .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/314e48a3/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 55c9167..121a02a 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -473,4 +473,15 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("AssertionError", output) assertDoesNotContain("Exception", output) } + + test("newProductSeqEncoder with REPL defined class") { +val output = runInterpreterInPasteMode("local-cluster[1,4,4096]", + """ + |case class Click(id: Int) + |spark.implicits.newProductSeqEncoder[Click] +""".stripMargin) + +assertDoesNotContain("error:", output) +assertDoesNotContain("Exception", output) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/314e48a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 0782143..93fc565 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -45,8 +45,8 @@ import org.apache.spark.util.Utils object ExpressionEncoder { def apply[T : TypeTag](): ExpressionEncoder[T] = { // We convert the not-serializable TypeTag into StructType and ClassTag. -val mirror = typeTag[T].mirror -val tpe = typeTag[T].tpe +val mirror = ScalaReflection.mirror +val tpe = typeTag[T].in(mirror).tpe if (ScalaReflection.optionOfProductType(tpe)) { throw new UnsupportedOperationException( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org