[GitHub] [spark-website] shaneknapp opened a new pull request #186: testing how-to for k8s changes

2019-03-27 Thread GitBox
shaneknapp opened a new pull request #186: testing how-to for k8s changes
URL: https://github.com/apache/spark-website/pull/186
 
 
   i think that this will be quite useful.  :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-24902][K8S] Add PV integration tests

2019-03-27 Thread shaneknapp
This is an automated email from the ASF dual-hosted git repository.

shaneknapp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 39577a2  [SPARK-24902][K8S] Add PV integration tests
39577a2 is described below

commit 39577a27a0b58fd75b41d24b10012447748b7ee9
Author: Stavros Kontopoulos 
AuthorDate: Wed Mar 27 13:00:56 2019 -0700

[SPARK-24902][K8S] Add PV integration tests

## What changes were proposed in this pull request?

- Adds persistent volume integration tests
- Adds a custom tag to the test to exclude it if it is run against a cloud 
backend.
- Assumes default fs type for the host, AFAIK that is ext4.

## How was this patch tested?
Manually run the tests against minikube as usual:
```
[INFO] --- scalatest-maven-plugin:1.0:test (integration-test)  
spark-kubernetes-integration-tests_2.12 ---
Discovery starting.
Discovery completed in 192 milliseconds.
Run starting. Expected test count is: 16
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- Test PVs with local storage
```

Closes #23514 from skonto/pvctests.

Authored-by: Stavros Kontopoulos 
Signed-off-by: shane knapp 
---
 .../apache/spark/examples/DFSReadWriteTest.scala   |  12 +-
 .../k8s/integrationtest/KubernetesSuite.scala  |  35 +++-
 .../integrationtest/KubernetesTestComponents.scala |   3 +-
 .../deploy/k8s/integrationtest/PVTestsSuite.scala  | 189 +
 .../k8s/integrationtest/SecretsTestsSuite.scala|  27 +--
 .../spark/deploy/k8s/integrationtest/Utils.scala   |  22 +++
 6 files changed, 260 insertions(+), 28 deletions(-)

diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 1a77971..a738598 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -22,6 +22,9 @@ import java.io.File
 
 import scala.io.Source._
 
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.sql.SparkSession
 
 /**
@@ -107,6 +110,13 @@ object DFSReadWriteTest {
 
 println("Writing local file to DFS")
 val dfsFilename = s"$dfsDirPath/dfs_read_write_test"
+
+// delete file if exists
+val fs = FileSystem.get(spark.sessionState.newHadoopConf())
+if (fs.exists(new Path(dfsFilename))) {
+fs.delete(new Path(dfsFilename), true)
+}
+
 val fileRDD = spark.sparkContext.parallelize(fileContents)
 fileRDD.saveAsTextFile(dfsFilename)
 
@@ -123,7 +133,6 @@ object DFSReadWriteTest {
   .sum
 
 spark.stop()
-
 if (localWordCount == dfsWordCount) {
   println(s"Success! Local Word Count $localWordCount and " +
 s"DFS Word Count $dfsWordCount agree.")
@@ -131,7 +140,6 @@ object DFSReadWriteTest {
   println(s"Failure! Local Word Count $localWordCount " +
 s"and DFS Word Count $dfsWordCount disagree.")
 }
-
   }
 }
 // scalastyle:on println
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 91419e8..bc0bb20 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.internal.config._
 
 class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with 
SecretsTestsSuite
-  with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
+  with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with 
PVTestsSuite
   with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
@@ -178,6 +178,29 @@ class KubernetesSuite extends SparkFunSuite
   isJVM)
   }
 
+  protected def 

[spark] branch master updated: [SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on listing files

2019-03-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 49b0411  [SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out 
empty files on listing files
49b0411 is described below

commit 49b0411549dad82d0ae8daf93a9ae2624f206791
Author: Gengliang Wang 
AuthorDate: Wed Mar 27 10:08:38 2019 -0700

[SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on 
listing files

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/23130, all empty files are excluded 
from target file splits in `FileSourceScanExec`.
In File source V2, we should keep the same behavior.

This PR suggests to filter out empty files on listing files in 
`PartitioningAwareFileIndex` so that the upper level doesn't need to handle 
them.
## How was this patch tested?

Unit test

Closes #24227 from gengliangwang/ignoreEmptyFile.

Authored-by: Gengliang Wang 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/execution/DataSourceScanExec.scala  |  4 ++--
 .../datasources/PartitioningAwareFileIndex.scala |  7 +--
 .../org/apache/spark/sql/sources/SaveLoadSuite.scala | 16 +---
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 92f7d66..33adfce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -382,7 +382,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.filter(_.getLen > 0).map { f =>
+p.files.map { f =>
   PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
 }
   }.groupBy { f =>
@@ -426,7 +426,7 @@ case class FileSourceScanExec(
   s"open cost is considered as scanning $openCostInBytes bytes.")
 
 val splitFiles = selectedPartitions.flatMap { partition =>
-  partition.files.filter(_.getLen > 0).flatMap { file =>
+  partition.files.flatMap { file =>
 // getPath() is very expensive so we only want to call it once in this 
block:
 val filePath = file.getPath
 val isSplitable = relation.fileFormat.isSplitable(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index f5ae095..29b304a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -58,15 +58,18 @@ abstract class PartitioningAwareFileIndex(
 
   override def listFiles(
   partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
+def isNonEmptyFile(f: FileStatus): Boolean = {
+  isDataPath(f.getPath) && f.getLen > 0
+}
 val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
-  PartitionDirectory(InternalRow.empty, allFiles().filter(f => 
isDataPath(f.getPath))) :: Nil
+  PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) 
:: Nil
 } else {
   prunePartitions(partitionFilters, partitionSpec()).map {
 case PartitionPath(values, path) =>
   val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
 case Some(existingDir) =>
   // Directory has children files in it, return them
-  existingDir.filter(f => isDataPath(f.getPath))
+  existingDir.filter(isNonEmptyFile)
 
 case None =>
   // Directory does not exist, or has no children files
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 048e4b8..7680f61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -146,13 +146,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   }
 
   test("skip empty files in non bucketed read") {
-withTempDir { dir =>
-  val path = dir.getCanonicalPath
-  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
-  Files.write(Paths.get(path, "notEmpty"), 
"a".getBytes(StandardCharsets.UTF_8))
-  val readback = 

[spark] branch master updated: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`

2019-03-27 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f1fe805  [SPARK-27279][SQL] Reuse subquery should compare child plan 
of `SubqueryExec`
f1fe805 is described below

commit f1fe805bed5e69c9c8055c83bf854d2d713e6466
Author: Daoyuan Wang 
AuthorDate: Wed Mar 27 08:45:22 2019 -0700

[SPARK-27279][SQL] Reuse subquery should compare child plan of 
`SubqueryExec`

## What changes were proposed in this pull request?

For now, `ReuseSubquery` in Spark compares two subqueries at `SubqueryExec` 
level, which invalidates the `ReuseSubquery` rule. This pull request fixes 
this, and add a configuration key for subquery reuse exclusively.

## How was this patch tested?

add a unit test.

Closes #24214 from adrian-wang/reuse.

Authored-by: Daoyuan Wang 
Signed-off-by: gatorsmile 
---
 .../sql/execution/basicPhysicalOperators.scala |  2 ++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 29 +-
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index eacd35b..731e7da 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -674,6 +674,8 @@ case class SubqueryExec(name: String, child: SparkPlan) 
extends UnaryExecNode {
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
   @transient
   private lazy val relationFuture: Future[Array[InternalRow]] = {
 // relationFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e8d1ecc..5916cbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.spark.{AccumulatorSuite, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.util.StringUtils
-import org.apache.spark.sql.execution.aggregate
+import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec}
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
CartesianProductExec, SortMergeJoinExec}
@@ -113,6 +113,33 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("Reuse Subquery") {
+Seq(true, false).foreach { reuse =>
+  withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
+val df = sql(
+  """
+|SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM 
testData)
+|FROM testData
+|LIMIT 1
+  """.stripMargin)
+
+import scala.collection.mutable.ArrayBuffer
+val subqueries = ArrayBuffer[SubqueryExec]()
+df.queryExecution.executedPlan.transformAllExpressions {
+  case s @ ScalarSubquery(plan: SubqueryExec, _) =>
+subqueries += plan
+s
+}
+
+if (reuse) {
+  assert(subqueries.distinct.size == 1, "Subquery reusing not working 
correctly")
+} else {
+  assert(subqueries.distinct.size == 2, "There should be 2 subqueries 
when not reusing")
+}
+  }
+}
+  }
+
   test("SPARK-6743: no columns from cache") {
 Seq(
   (83, 0, 38),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations non-blocking by default

2019-03-27 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 956b52b1 [SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations 
non-blocking by default
956b52b1 is described below

commit 956b52b1670985a67e49b938ac1499ae65c79f6e
Author: Takeshi Yamamuro 
AuthorDate: Wed Mar 27 21:01:36 2019 +0900

[SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations non-blocking 
by default

## What changes were proposed in this pull request?
To make the blocking behaviour consistent, this pr made catalog table/view 
`uncacheQuery` non-blocking by default. If this pr merged, all the behaviours 
in spark are non-blocking by default.

## How was this patch tested?
Pass Jenkins.

Closes #24212 from maropu/SPARK-26771-FOLLOWUP.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Takeshi Yamamuro 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  3 +-
 .../apache/spark/sql/execution/CacheManager.scala  |  8 +--
 .../sql/execution/columnar/InMemoryRelation.scala  |  2 +-
 .../apache/spark/sql/internal/CatalogImpl.scala|  6 +-
 .../org/apache/spark/sql/CachedTableSuite.scala| 69 +-
 .../apache/spark/sql/hive/CachedTableSuite.scala   | 15 -
 6 files changed, 62 insertions(+), 41 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 2accb32..69c2f61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2956,7 +2956,8 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
   def unpersist(blocking: Boolean): this.type = {
-sparkSession.sharedState.cacheManager.uncacheQuery(this, cascade = false, 
blocking)
+sparkSession.sharedState.cacheManager.uncacheQuery(
+  sparkSession, logicalPlan, cascade = false, blocking)
 this
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 0145478..d1f096b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -98,13 +98,11 @@ class CacheManager extends Logging {
* @param query The [[Dataset]] to be un-cached.
* @param cascade   If true, un-cache all the cache entries that refer to 
the given
*  [[Dataset]]; otherwise un-cache the given [[Dataset]] 
only.
-   * @param blocking  Whether to block until all blocks are deleted.
*/
   def uncacheQuery(
   query: Dataset[_],
-  cascade: Boolean,
-  blocking: Boolean = true): Unit = {
-uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
+  cascade: Boolean): Unit = {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade)
   }
 
   /**
@@ -119,7 +117,7 @@ class CacheManager extends Logging {
   spark: SparkSession,
   plan: LogicalPlan,
   cascade: Boolean,
-  blocking: Boolean): Unit = {
+  blocking: Boolean = false): Unit = {
 val shouldRemove: LogicalPlan => Boolean =
   if (cascade) {
 _.find(_.sameResult(plan)).isDefined
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 1e4453f..1af5033 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -65,7 +65,7 @@ case class CachedRDDBuilder(
 _cachedColumnBuffers
   }
 
-  def clearCache(blocking: Boolean = true): Unit = {
+  def clearCache(blocking: Boolean = false): Unit = {
 if (_cachedColumnBuffers != null) {
   synchronized {
 if (_cachedColumnBuffers != null) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 4698e8a..5e7d17b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -365,7 +365,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   override def dropTempView(viewName: String): Boolean = {
 sparkSession.sessionState.catalog.getTempView(viewName).exists { viewDef =>
   sparkSession.sharedState.cacheManager.uncacheQuery(
-sparkSession, viewDef, cascade = false, blocking = true)
+sparkSession, viewDef, cascade = false)
   sessionCatalog.dropTempView(viewName)
 }
   }
@@ -381,7 

[spark] branch master updated: [SPARK-27288][SQL] Pruning nested field in complex map key from object serializers

2019-03-27 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 93ff690  [SPARK-27288][SQL] Pruning nested field in complex map key 
from object serializers
93ff690 is described below

commit 93ff69003b228abcf08da4488593f552e3a61665
Author: Liang-Chi Hsieh 
AuthorDate: Wed Mar 27 19:40:14 2019 +0900

[SPARK-27288][SQL] Pruning nested field in complex map key from object 
serializers

## What changes were proposed in this pull request?

In the original PR #24158, pruning nested field in complex map key was not 
supported, because some methods in schema pruning did't support it at that 
moment. This is a followup to add it.

## How was this patch tested?

Added tests.

Closes #24220 from viirya/SPARK-26847-followup.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Takeshi Yamamuro 
---
 .../apache/spark/sql/catalyst/optimizer/objects.scala | 13 ++---
 .../optimizer/ObjectSerializerPruningSuite.scala  |  5 +++--
 .../apache/spark/sql/DatasetOptimizationSuite.scala   | 19 ++-
 3 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
index 8e92421..c48bd8f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
@@ -131,8 +131,8 @@ object ObjectSerializerPruning extends Rule[LogicalPlan] {
 fields.map(f => collectStructType(f.dataType, structs))
   case ArrayType(elementType, _) =>
 collectStructType(elementType, structs)
-  case MapType(_, valueType, _) =>
-// Because we can't select a field from struct in key, so we skip key 
type.
+  case MapType(keyType, valueType, _) =>
+collectStructType(keyType, structs)
 collectStructType(valueType, structs)
   // We don't use UserDefinedType in those serializers.
   case _: UserDefinedType[_] =>
@@ -179,13 +179,20 @@ object ObjectSerializerPruning extends Rule[LogicalPlan] {
 
 val transformedSerializer = serializer.transformDown {
   case m: ExternalMapToCatalyst =>
+val prunedKeyConverter = m.keyConverter.transformDown {
+  case s: CreateNamedStruct if structTypeIndex < 
prunedStructTypes.size =>
+val prunedType = prunedStructTypes(structTypeIndex)
+structTypeIndex += 1
+pruneNamedStruct(s, prunedType)
+}
 val prunedValueConverter = m.valueConverter.transformDown {
   case s: CreateNamedStruct if structTypeIndex < 
prunedStructTypes.size =>
 val prunedType = prunedStructTypes(structTypeIndex)
 structTypeIndex += 1
 pruneNamedStruct(s, prunedType)
 }
-m.copy(valueConverter = alignNullTypeInIf(prunedValueConverter))
+m.copy(keyConverter = alignNullTypeInIf(prunedKeyConverter),
+  valueConverter = alignNullTypeInIf(prunedValueConverter))
   case s: CreateNamedStruct if structTypeIndex < prunedStructTypes.size =>
 val prunedType = prunedStructTypes(structTypeIndex)
 structTypeIndex += 1
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala
index fb0f3a3..0dd4d6a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala
@@ -60,8 +60,9 @@ class ObjectSerializerPruningSuite extends PlanTest {
   Seq(StructType.fromDDL("a struct, b int"),
 StructType.fromDDL("a int, b int")),
   Seq(StructType.fromDDL("a int, b int, c string")),
-  Seq.empty[StructType],
-  Seq(StructType.fromDDL("c long, d string"))
+  Seq(StructType.fromDDL("a struct, b int"),
+StructType.fromDDL("a int, b int")),
+  Seq(StructType.fromDDL("a int, b int"), StructType.fromDDL("c long, d 
string"))
 )
 
 dataTypes.zipWithIndex.foreach { case (dt, idx) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetOptimizationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetOptimizationSuite.scala
index 69634f8..cfbb343 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetOptimizationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetOptimizationSuite.scala
@@ -51,7 +51,9 @@ class DatasetOptimizationSuite extends QueryTest with 
SharedSQLContext {
   val structs = 

[spark] branch master updated: [SPARK-27083][SQL] Add a new conf to control subqueryReuse

2019-03-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fac3110  [SPARK-27083][SQL] Add a new conf to control subqueryReuse
fac3110 is described below

commit fac31104f69daa6cf72eae1a0de995ef0777d75c
Author: liuxian 
AuthorDate: Tue Mar 26 23:37:58 2019 -0700

[SPARK-27083][SQL] Add a new conf to control subqueryReuse

## What changes were proposed in this pull request?
Subquery Reuse and Exchange Reuse are not the same feature, if we don't 
want to reuse subqueries,and we just want to reuse exchanges,only one 
configuration that cannot be done.

This PR adds a new configuration `spark.sql.subquery.reuse` to control 
subqueryReuse.

## How was this patch tested?

N/A

Closes #23998 from 10110346/SUBQUERY_REUSE.

Authored-by: liuxian 
Signed-off-by: Wenchen Fan 
---
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 8 
 .../src/main/scala/org/apache/spark/sql/execution/subquery.scala  | 2 +-
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3ecc340..411805e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -889,6 +889,12 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
+.internal()
+.doc("When true, the planner will try to find out duplicated subqueries 
and re-use them.")
+.booleanConf
+.createWithDefault(true)
+
   val STATE_STORE_PROVIDER_CLASS =
 buildConf("spark.sql.streaming.stateStore.providerClass")
   .internal()
@@ -1888,6 +1894,8 @@ class SQLConf extends Serializable with Logging {
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
 
+  def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
+
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   def constraintPropagationEnabled: Boolean = 
getConf(CONSTRAINT_PROPAGATION_ENABLED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index e084c79..b7f10ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -125,7 +125,7 @@ case class PlanSubqueries(sparkSession: SparkSession) 
extends Rule[SparkPlan] {
 case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = {
-if (!conf.exchangeReuseEnabled) {
+if (!conf.subqueryReuseEnabled) {
   return plan
 }
 // Build a hash map using schema of subqueries to avoid O(N*N) sameResult 
calls.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org