spark git commit: [SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning

2017-10-14 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 014dc8471 -> e8547ffb4


[SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning

## What changes were proposed in this pull request?

In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has 
the expected partitioning for Streaming Stateful Operators. The problem is that 
we are not allowed to access this information during planning.
The reason we added that check was because CoalesceExec could actually create 
RDDs with 0 partitions. We should fix it such that when CoalesceExec says that 
there is a SinglePartition, there is in fact an inputRDD of 1 partition instead 
of 0 partitions.

## How was this patch tested?

Regression test in StreamingQuerySuite

Author: Burak Yavuz 

Closes #19467 from brkyvz/stateful-op.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8547ffb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8547ffb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8547ffb

Branch: refs/heads/master
Commit: e8547ffb49071525c06876c856cecc0d4731b918
Parents: 014dc84
Author: Burak Yavuz 
Authored: Sat Oct 14 17:39:15 2017 -0700
Committer: Tathagata Das 
Committed: Sat Oct 14 17:39:15 2017 -0700

--
 .../catalyst/plans/physical/partitioning.scala  |  15 +-
 .../sql/execution/basicPhysicalOperators.scala  |  27 +++-
 .../execution/exchange/EnsureRequirements.scala |   5 +-
 .../streaming/FlatMapGroupsWithStateExec.scala  |   2 +-
 .../streaming/IncrementalExecution.scala|  39 ++
 .../execution/streaming/statefulOperators.scala |  11 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   2 +
 .../spark/sql/execution/PlannerSuite.scala  |  17 +++
 .../streaming/state/StateStoreRDDSuite.scala|   2 +-
 .../SymmetricHashJoinStateManagerSuite.scala|   2 +-
 .../spark/sql/streaming/DeduplicateSuite.scala  |  11 +-
 .../EnsureStatefulOpPartitioningSuite.scala | 138 ---
 .../streaming/FlatMapGroupsWithStateSuite.scala |   6 +-
 .../sql/streaming/StatefulOperatorTest.scala|  49 +++
 .../streaming/StreamingAggregationSuite.scala   |   8 +-
 .../sql/streaming/StreamingJoinSuite.scala  |   2 +-
 .../sql/streaming/StreamingQuerySuite.scala |  13 ++
 17 files changed, 160 insertions(+), 189 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e8547ffb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 51d78dd..e57c842 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -49,7 +49,9 @@ case object AllTuples extends Distribution
  * can mean such tuples are either co-located in the same partition or they 
will be contiguous
  * within a single partition.
  */
-case class ClusteredDistribution(clustering: Seq[Expression]) extends 
Distribution {
+case class ClusteredDistribution(
+clustering: Seq[Expression],
+numPartitions: Option[Int] = None) extends Distribution {
   require(
 clustering != Nil,
 "The clustering expressions of a ClusteredDistribution should not be Nil. 
" +
@@ -221,6 +223,7 @@ case object SinglePartition extends Partitioning {
 
   override def satisfies(required: Distribution): Boolean = required match {
 case _: BroadcastDistribution => false
+case ClusteredDistribution(_, desiredPartitions) => 
desiredPartitions.forall(_ == 1)
 case _ => true
   }
 
@@ -243,8 +246,9 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
 
   override def satisfies(required: Distribution): Boolean = required match {
 case UnspecifiedDistribution => true
-case ClusteredDistribution(requiredClustering) =>
-  expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+case ClusteredDistribution(requiredClustering, desiredPartitions) =>
+  expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) 
&&
+desiredPartitions.forall(_ == numPartitions) // if desiredPartitions = 
None, returns true
 case _ => false
   }
 
@@ -289,8 +293,9 @@ case class RangePartitioning(ordering: Seq[SortOrder], 
numPartitions: Int)
 case OrderedDistribution(requiredOrdering) =>
   val minSize = Seq(requiredOrdering.size, ordering.size).min
   requiredOrdering.take(minSize) == 

spark git commit: [SPARK-22233][CORE] Allow user to filter out empty split in HadoopRDD

2017-10-14 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master e0503a722 -> 014dc8471


[SPARK-22233][CORE] Allow user to filter out empty split in HadoopRDD

## What changes were proposed in this pull request?
Add a flag spark.files.ignoreEmptySplits. When true, methods like that use 
HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not create a 
partition for input splits that are empty.

Author: liulijia 

Closes #19464 from liutang123/SPARK-22233.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/014dc847
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/014dc847
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/014dc847

Branch: refs/heads/master
Commit: 014dc8471200518d63005eed531777d30d8a6639
Parents: e0503a7
Author: liulijia 
Authored: Sat Oct 14 17:37:33 2017 +0900
Committer: hyukjinkwon 
Committed: Sat Oct 14 17:37:33 2017 +0900

--
 .../apache/spark/internal/config/package.scala  |  6 ++
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 12 ++-
 .../org/apache/spark/rdd/NewHadoopRDD.scala | 13 ++-
 .../test/scala/org/apache/spark/FileSuite.scala | 95 ++--
 4 files changed, 112 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 19336f8..ce013d6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -270,6 +270,12 @@ package object config {
 .longConf
 .createWithDefault(4 * 1024 * 1024)
 
+  private[spark] val IGNORE_EMPTY_SPLITS = 
ConfigBuilder("spark.files.ignoreEmptySplits")
+.doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
+  "SparkContext.textFiles will not create a partition for input splits 
that are empty.")
+.booleanConf
+.createWithDefault(false)
+
   private[spark] val SECRET_REDACTION_PATTERN =
 ConfigBuilder("spark.redaction.regex")
   .doc("Regex to decide which Spark configuration properties and 
environment variables in " +

http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 23b3442..1f33c0a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
+import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, 
IGNORE_EMPTY_SPLITS}
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
 import org.apache.spark.storage.StorageLevel
@@ -134,6 +134,8 @@ class HadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
+  private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
+
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
   protected def getJobConf(): JobConf = {
 val conf: Configuration = broadcastedConf.value.value
@@ -195,8 +197,12 @@ class HadoopRDD[K, V](
 val jobConf = getJobConf()
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
-val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, 
minPartitions)
+val inputSplits = if (ignoreEmptySplits) {
+  allInputSplits.filter(_.getLength > 0)
+} else {
+  allInputSplits
+}
 val array = new Array[Partition](inputSplits.size)
 for (i <- 0 until inputSplits.size) {
   array(i) = new HadoopPartition(id, i, inputSplits(i))

http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 

spark git commit: [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.

2017-10-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 920372a19 -> eb00037a7


[SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.

## What changes were proposed in this pull request?

When fixing schema field names using escape characters with 
`addReferenceMinorObj()` at 
[SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), 
double-quotes around the names were remained and the names become something 
like `"((java.lang.String) references[1])"`.

```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new 
org.apache.spark.sql.types.StructType().add("((java.lang.String) 
references[1])", org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new 
org.apache.spark.sql.types.StructType().add("((java.lang.String) 
references[2])", org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```

We should remove the double-quotes to refer the values in `references` properly:

```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new 
org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), 
org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new 
org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), 
org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN 

Closes #19491 from ueshin/issues/SPARK-22273.

(cherry picked from commit e0503a7223410289d01bc4b20da3a451730577da)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb00037a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb00037a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb00037a

Branch: refs/heads/branch-2.1
Commit: eb00037a70614daaf94a3ae59d3126ba696da3e2
Parents: 920372a
Author: Takuya UESHIN 
Authored: Fri Oct 13 23:24:36 2017 -0700
Committer: gatorsmile 
Committed: Fri Oct 13 23:25:21 2017 -0700

--
 .../sql/execution/aggregate/RowBasedHashMapGenerator.scala   | 8 
 .../sql/execution/aggregate/VectorizedHashMapGenerator.scala | 8 
 2 files changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eb00037a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
index 1b6e6d2..ffb0c6c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
@@ -50,10 +50,10 @@ class RowBasedHashMapGenerator(
   val keyName = ctx.addReferenceObj(key.name)
   key.dataType match {
 case d: DecimalType =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.createDecimalType(
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.createDecimalType(
   |${d.precision}, ${d.scale}))""".stripMargin
 case _ =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
   }
 }.mkString("\n").concat(";")
 
@@ -63,10 +63,10 @@ class RowBasedHashMapGenerator(
   val keyName = ctx.addReferenceObj(key.name)
   key.dataType match {
 case d: DecimalType =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.createDecimalType(
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.createDecimalType(
   |${d.precision}, ${d.scale}))""".stripMargin
 case _ =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
   }
 }.mkString("\n").concat(";")
 


spark git commit: [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.

2017-10-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 30d5c9fd8 -> acbad83ec


[SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.

## What changes were proposed in this pull request?

When fixing schema field names using escape characters with 
`addReferenceMinorObj()` at 
[SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), 
double-quotes around the names were remained and the names become something 
like `"((java.lang.String) references[1])"`.

```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new 
org.apache.spark.sql.types.StructType().add("((java.lang.String) 
references[1])", org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new 
org.apache.spark.sql.types.StructType().add("((java.lang.String) 
references[2])", org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```

We should remove the double-quotes to refer the values in `references` properly:

```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new 
org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), 
org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new 
org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), 
org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN 

Closes #19491 from ueshin/issues/SPARK-22273.

(cherry picked from commit e0503a7223410289d01bc4b20da3a451730577da)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acbad83e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acbad83e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acbad83e

Branch: refs/heads/branch-2.2
Commit: acbad83ecd7d3e19f0870021a9dd342a85897ae4
Parents: 30d5c9f
Author: Takuya UESHIN 
Authored: Fri Oct 13 23:24:36 2017 -0700
Committer: gatorsmile 
Committed: Fri Oct 13 23:24:49 2017 -0700

--
 .../sql/execution/aggregate/RowBasedHashMapGenerator.scala   | 8 
 .../sql/execution/aggregate/VectorizedHashMapGenerator.scala | 8 
 2 files changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/acbad83e/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
index 9316ebc..3718424 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
@@ -50,10 +50,10 @@ class RowBasedHashMapGenerator(
   val keyName = ctx.addReferenceMinorObj(key.name)
   key.dataType match {
 case d: DecimalType =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.createDecimalType(
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.createDecimalType(
   |${d.precision}, ${d.scale}))""".stripMargin
 case _ =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
   }
 }.mkString("\n").concat(";")
 
@@ -63,10 +63,10 @@ class RowBasedHashMapGenerator(
   val keyName = ctx.addReferenceMinorObj(key.name)
   key.dataType match {
 case d: DecimalType =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.createDecimalType(
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.createDecimalType(
   |${d.precision}, ${d.scale}))""".stripMargin
 case _ =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
   }
 }.mkString("\n").concat(";")
 


spark git commit: [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.

2017-10-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master e3536406e -> e0503a722


[SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.

## What changes were proposed in this pull request?

When fixing schema field names using escape characters with 
`addReferenceMinorObj()` at 
[SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), 
double-quotes around the names were remained and the names become something 
like `"((java.lang.String) references[1])"`.

```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new 
org.apache.spark.sql.types.StructType().add("((java.lang.String) 
references[1])", org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new 
org.apache.spark.sql.types.StructType().add("((java.lang.String) 
references[2])", org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```

We should remove the double-quotes to refer the values in `references` properly:

```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new 
org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), 
org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new 
org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), 
org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN 

Closes #19491 from ueshin/issues/SPARK-22273.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0503a72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0503a72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0503a72

Branch: refs/heads/master
Commit: e0503a7223410289d01bc4b20da3a451730577da
Parents: e353640
Author: Takuya UESHIN 
Authored: Fri Oct 13 23:24:36 2017 -0700
Committer: gatorsmile 
Committed: Fri Oct 13 23:24:36 2017 -0700

--
 .../sql/execution/aggregate/RowBasedHashMapGenerator.scala   | 8 
 .../sql/execution/aggregate/VectorizedHashMapGenerator.scala | 8 
 2 files changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e0503a72/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
index 9316ebc..3718424 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
@@ -50,10 +50,10 @@ class RowBasedHashMapGenerator(
   val keyName = ctx.addReferenceMinorObj(key.name)
   key.dataType match {
 case d: DecimalType =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.createDecimalType(
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.createDecimalType(
   |${d.precision}, ${d.scale}))""".stripMargin
 case _ =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
   }
 }.mkString("\n").concat(";")
 
@@ -63,10 +63,10 @@ class RowBasedHashMapGenerator(
   val keyName = ctx.addReferenceMinorObj(key.name)
   key.dataType match {
 case d: DecimalType =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.createDecimalType(
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.createDecimalType(
   |${d.precision}, ${d.scale}))""".stripMargin
 case _ =>
-  s""".add("$keyName", 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
+  s""".add($keyName, 
org.apache.spark.sql.types.DataTypes.${key.dataType})"""
   }
 }.mkString("\n").concat(";")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0503a72/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
--
diff --git 

spark git commit: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible

2017-10-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 06df34d35 -> e3536406e


[SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics 
collection fails if a new file isn't yet visible

## What changes were proposed in this pull request?

`BasicWriteTaskStatsTracker.getFileSize()` to catch `FileNotFoundException`, 
log  info and then return 0 as a file size.

This ensures that if a newly created file isn't visible due to the store not 
always having create consistency, the metric collection doesn't cause the 
failure.

## How was this patch tested?

New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only 
checks the resilience to missing files, but verifies the existing logic as to 
how file statistics are gathered.

Note that in the current implementation

1. if you call `Tracker..getFinalStats()` more than once, the file size count 
will increase by size of the last file. This could be fixed by clearing the 
filename field inside `getFinalStats()` itself.

2. If you pass in an empty or null string to `Tracker.newFile(path)` then 
IllegalArgumentException is raised, but only in `getFinalStats()`, rather than 
in `newFile`.  There's a test for this behaviour in the new suite, as it 
verifies that only FNFEs get swallowed.

Author: Steve Loughran 

Closes #18979 from steveloughran/cloud/SPARK-21762-missing-files-in-metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3536406
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3536406
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3536406

Branch: refs/heads/master
Commit: e3536406ec6ff65a8b41ba2f2fd40517a760cfd6
Parents: 06df34d
Author: Steve Loughran 
Authored: Fri Oct 13 23:08:17 2017 -0700
Committer: gatorsmile 
Committed: Fri Oct 13 23:08:17 2017 -0700

--
 .../datasources/BasicWriteStatsTracker.scala|  49 -
 .../BasicWriteTaskStatsTrackerSuite.scala   | 220 +++
 .../sql/hive/execution/SQLQuerySuite.scala  |   8 +
 3 files changed, 265 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e3536406/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index b8f7d13..11af0aa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -44,20 +47,32 @@ case class BasicWriteTaskStats(
  * @param hadoopConf
  */
 class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
-  extends WriteTaskStatsTracker {
+  extends WriteTaskStatsTracker with Logging {
 
   private[this] var numPartitions: Int = 0
   private[this] var numFiles: Int = 0
+  private[this] var submittedFiles: Int = 0
   private[this] var numBytes: Long = 0L
   private[this] var numRows: Long = 0L
 
-  private[this] var curFile: String = null
-
+  private[this] var curFile: Option[String] = None
 
-  private def getFileSize(filePath: String): Long = {
+  /**
+   * Get the size of the file expected to have been written by a worker.
+   * @param filePath path to the file
+   * @return the file size or None if the file was not found.
+   */
+  private def getFileSize(filePath: String): Option[Long] = {
 val path = new Path(filePath)
 val fs = path.getFileSystem(hadoopConf)
-fs.getFileStatus(path).getLen()
+try {
+  Some(fs.getFileStatus(path).getLen())
+} catch {
+  case e: FileNotFoundException =>
+// may arise against eventually consistent object stores
+logDebug(s"File $path is not yet visible", e)
+None
+}
   }
 
 
@@ -70,12 +85,19 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
   }
 
   override def newFile(filePath: String): Unit = {
-if (numFiles > 0) {
-  // we assume here that we've finished writing to disk the previous file 
by now
-  numBytes += getFileSize(curFile)
+statCurrentFile()
+curFile =