spark git commit: [SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning
Repository: spark Updated Branches: refs/heads/master 014dc8471 -> e8547ffb4 [SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning ## What changes were proposed in this pull request? In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has the expected partitioning for Streaming Stateful Operators. The problem is that we are not allowed to access this information during planning. The reason we added that check was because CoalesceExec could actually create RDDs with 0 partitions. We should fix it such that when CoalesceExec says that there is a SinglePartition, there is in fact an inputRDD of 1 partition instead of 0 partitions. ## How was this patch tested? Regression test in StreamingQuerySuite Author: Burak YavuzCloses #19467 from brkyvz/stateful-op. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8547ffb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8547ffb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8547ffb Branch: refs/heads/master Commit: e8547ffb49071525c06876c856cecc0d4731b918 Parents: 014dc84 Author: Burak Yavuz Authored: Sat Oct 14 17:39:15 2017 -0700 Committer: Tathagata Das Committed: Sat Oct 14 17:39:15 2017 -0700 -- .../catalyst/plans/physical/partitioning.scala | 15 +- .../sql/execution/basicPhysicalOperators.scala | 27 +++- .../execution/exchange/EnsureRequirements.scala | 5 +- .../streaming/FlatMapGroupsWithStateExec.scala | 2 +- .../streaming/IncrementalExecution.scala| 39 ++ .../execution/streaming/statefulOperators.scala | 11 +- .../org/apache/spark/sql/DataFrameSuite.scala | 2 + .../spark/sql/execution/PlannerSuite.scala | 17 +++ .../streaming/state/StateStoreRDDSuite.scala| 2 +- .../SymmetricHashJoinStateManagerSuite.scala| 2 +- .../spark/sql/streaming/DeduplicateSuite.scala | 11 +- .../EnsureStatefulOpPartitioningSuite.scala | 138 --- .../streaming/FlatMapGroupsWithStateSuite.scala | 6 +- .../sql/streaming/StatefulOperatorTest.scala| 49 +++ .../streaming/StreamingAggregationSuite.scala | 8 +- .../sql/streaming/StreamingJoinSuite.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 13 ++ 17 files changed, 160 insertions(+), 189 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8547ffb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 51d78dd..e57c842 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -49,7 +49,9 @@ case object AllTuples extends Distribution * can mean such tuples are either co-located in the same partition or they will be contiguous * within a single partition. */ -case class ClusteredDistribution(clustering: Seq[Expression]) extends Distribution { +case class ClusteredDistribution( +clustering: Seq[Expression], +numPartitions: Option[Int] = None) extends Distribution { require( clustering != Nil, "The clustering expressions of a ClusteredDistribution should not be Nil. " + @@ -221,6 +223,7 @@ case object SinglePartition extends Partitioning { override def satisfies(required: Distribution): Boolean = required match { case _: BroadcastDistribution => false +case ClusteredDistribution(_, desiredPartitions) => desiredPartitions.forall(_ == 1) case _ => true } @@ -243,8 +246,9 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) override def satisfies(required: Distribution): Boolean = required match { case UnspecifiedDistribution => true -case ClusteredDistribution(requiredClustering) => - expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) +case ClusteredDistribution(requiredClustering, desiredPartitions) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) && +desiredPartitions.forall(_ == numPartitions) // if desiredPartitions = None, returns true case _ => false } @@ -289,8 +293,9 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) case OrderedDistribution(requiredOrdering) => val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) ==
spark git commit: [SPARK-22233][CORE] Allow user to filter out empty split in HadoopRDD
Repository: spark Updated Branches: refs/heads/master e0503a722 -> 014dc8471 [SPARK-22233][CORE] Allow user to filter out empty split in HadoopRDD ## What changes were proposed in this pull request? Add a flag spark.files.ignoreEmptySplits. When true, methods like that use HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not create a partition for input splits that are empty. Author: liulijiaCloses #19464 from liutang123/SPARK-22233. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/014dc847 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/014dc847 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/014dc847 Branch: refs/heads/master Commit: 014dc8471200518d63005eed531777d30d8a6639 Parents: e0503a7 Author: liulijia Authored: Sat Oct 14 17:37:33 2017 +0900 Committer: hyukjinkwon Committed: Sat Oct 14 17:37:33 2017 +0900 -- .../apache/spark/internal/config/package.scala | 6 ++ .../scala/org/apache/spark/rdd/HadoopRDD.scala | 12 ++- .../org/apache/spark/rdd/NewHadoopRDD.scala | 13 ++- .../test/scala/org/apache/spark/FileSuite.scala | 95 ++-- 4 files changed, 112 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 19336f8..ce013d6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -270,6 +270,12 @@ package object config { .longConf .createWithDefault(4 * 1024 * 1024) + private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits") +.doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " + + "SparkContext.textFiles will not create a partition for input splits that are empty.") +.booleanConf +.createWithDefault(false) + private[spark] val SECRET_REDACTION_PATTERN = ConfigBuilder("spark.redaction.regex") .doc("Regex to decide which Spark configuration properties and environment variables in " + http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 23b3442..1f33c0a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES +import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS} import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel @@ -134,6 +134,8 @@ class HadoopRDD[K, V]( private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES) + private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS) + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value @@ -195,8 +197,12 @@ class HadoopRDD[K, V]( val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) -val inputFormat = getInputFormat(jobConf) -val inputSplits = inputFormat.getSplits(jobConf, minPartitions) +val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) +val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) +} else { + allInputSplits +} val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
spark git commit: [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.
Repository: spark Updated Branches: refs/heads/branch-2.1 920372a19 -> eb00037a7 [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators. ## What changes were proposed in this pull request? When fixing schema field names using escape characters with `addReferenceMinorObj()` at [SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), double-quotes around the names were remained and the names become something like `"((java.lang.String) references[1])"`. ```java /* 055 */ private int maxSteps = 2; /* 056 */ private int numRows = 0; /* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[1])", org.apache.spark.sql.types.DataTypes.StringType); /* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[2])", org.apache.spark.sql.types.DataTypes.LongType); /* 059 */ private Object emptyVBase; ``` We should remove the double-quotes to refer the values in `references` properly: ```java /* 055 */ private int maxSteps = 2; /* 056 */ private int numRows = 0; /* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), org.apache.spark.sql.types.DataTypes.StringType); /* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), org.apache.spark.sql.types.DataTypes.LongType); /* 059 */ private Object emptyVBase; ``` ## How was this patch tested? Existing tests. Author: Takuya UESHINCloses #19491 from ueshin/issues/SPARK-22273. (cherry picked from commit e0503a7223410289d01bc4b20da3a451730577da) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb00037a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb00037a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb00037a Branch: refs/heads/branch-2.1 Commit: eb00037a70614daaf94a3ae59d3126ba696da3e2 Parents: 920372a Author: Takuya UESHIN Authored: Fri Oct 13 23:24:36 2017 -0700 Committer: gatorsmile Committed: Fri Oct 13 23:25:21 2017 -0700 -- .../sql/execution/aggregate/RowBasedHashMapGenerator.scala | 8 .../sql/execution/aggregate/VectorizedHashMapGenerator.scala | 8 2 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb00037a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index 1b6e6d2..ffb0c6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -50,10 +50,10 @@ class RowBasedHashMapGenerator( val keyName = ctx.addReferenceObj(key.name) key.dataType match { case d: DecimalType => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.createDecimalType( + s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( |${d.precision}, ${d.scale}))""".stripMargin case _ => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.${key.dataType})""" + s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" } }.mkString("\n").concat(";") @@ -63,10 +63,10 @@ class RowBasedHashMapGenerator( val keyName = ctx.addReferenceObj(key.name) key.dataType match { case d: DecimalType => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.createDecimalType( + s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( |${d.precision}, ${d.scale}))""".stripMargin case _ => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.${key.dataType})""" + s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" } }.mkString("\n").concat(";")
spark git commit: [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.
Repository: spark Updated Branches: refs/heads/branch-2.2 30d5c9fd8 -> acbad83ec [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators. ## What changes were proposed in this pull request? When fixing schema field names using escape characters with `addReferenceMinorObj()` at [SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), double-quotes around the names were remained and the names become something like `"((java.lang.String) references[1])"`. ```java /* 055 */ private int maxSteps = 2; /* 056 */ private int numRows = 0; /* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[1])", org.apache.spark.sql.types.DataTypes.StringType); /* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[2])", org.apache.spark.sql.types.DataTypes.LongType); /* 059 */ private Object emptyVBase; ``` We should remove the double-quotes to refer the values in `references` properly: ```java /* 055 */ private int maxSteps = 2; /* 056 */ private int numRows = 0; /* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), org.apache.spark.sql.types.DataTypes.StringType); /* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), org.apache.spark.sql.types.DataTypes.LongType); /* 059 */ private Object emptyVBase; ``` ## How was this patch tested? Existing tests. Author: Takuya UESHINCloses #19491 from ueshin/issues/SPARK-22273. (cherry picked from commit e0503a7223410289d01bc4b20da3a451730577da) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acbad83e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acbad83e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acbad83e Branch: refs/heads/branch-2.2 Commit: acbad83ecd7d3e19f0870021a9dd342a85897ae4 Parents: 30d5c9f Author: Takuya UESHIN Authored: Fri Oct 13 23:24:36 2017 -0700 Committer: gatorsmile Committed: Fri Oct 13 23:24:49 2017 -0700 -- .../sql/execution/aggregate/RowBasedHashMapGenerator.scala | 8 .../sql/execution/aggregate/VectorizedHashMapGenerator.scala | 8 2 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/acbad83e/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index 9316ebc..3718424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -50,10 +50,10 @@ class RowBasedHashMapGenerator( val keyName = ctx.addReferenceMinorObj(key.name) key.dataType match { case d: DecimalType => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.createDecimalType( + s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( |${d.precision}, ${d.scale}))""".stripMargin case _ => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.${key.dataType})""" + s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" } }.mkString("\n").concat(";") @@ -63,10 +63,10 @@ class RowBasedHashMapGenerator( val keyName = ctx.addReferenceMinorObj(key.name) key.dataType match { case d: DecimalType => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.createDecimalType( + s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( |${d.precision}, ${d.scale}))""".stripMargin case _ => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.${key.dataType})""" + s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" } }.mkString("\n").concat(";")
spark git commit: [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators.
Repository: spark Updated Branches: refs/heads/master e3536406e -> e0503a722 [SPARK-22273][SQL] Fix key/value schema field names in HashMapGenerators. ## What changes were proposed in this pull request? When fixing schema field names using escape characters with `addReferenceMinorObj()` at [SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), double-quotes around the names were remained and the names become something like `"((java.lang.String) references[1])"`. ```java /* 055 */ private int maxSteps = 2; /* 056 */ private int numRows = 0; /* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[1])", org.apache.spark.sql.types.DataTypes.StringType); /* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[2])", org.apache.spark.sql.types.DataTypes.LongType); /* 059 */ private Object emptyVBase; ``` We should remove the double-quotes to refer the values in `references` properly: ```java /* 055 */ private int maxSteps = 2; /* 056 */ private int numRows = 0; /* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), org.apache.spark.sql.types.DataTypes.StringType); /* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), org.apache.spark.sql.types.DataTypes.LongType); /* 059 */ private Object emptyVBase; ``` ## How was this patch tested? Existing tests. Author: Takuya UESHINCloses #19491 from ueshin/issues/SPARK-22273. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0503a72 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0503a72 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0503a72 Branch: refs/heads/master Commit: e0503a7223410289d01bc4b20da3a451730577da Parents: e353640 Author: Takuya UESHIN Authored: Fri Oct 13 23:24:36 2017 -0700 Committer: gatorsmile Committed: Fri Oct 13 23:24:36 2017 -0700 -- .../sql/execution/aggregate/RowBasedHashMapGenerator.scala | 8 .../sql/execution/aggregate/VectorizedHashMapGenerator.scala | 8 2 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e0503a72/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index 9316ebc..3718424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -50,10 +50,10 @@ class RowBasedHashMapGenerator( val keyName = ctx.addReferenceMinorObj(key.name) key.dataType match { case d: DecimalType => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.createDecimalType( + s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( |${d.precision}, ${d.scale}))""".stripMargin case _ => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.${key.dataType})""" + s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" } }.mkString("\n").concat(";") @@ -63,10 +63,10 @@ class RowBasedHashMapGenerator( val keyName = ctx.addReferenceMinorObj(key.name) key.dataType match { case d: DecimalType => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.createDecimalType( + s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( |${d.precision}, ${d.scale}))""".stripMargin case _ => - s""".add("$keyName", org.apache.spark.sql.types.DataTypes.${key.dataType})""" + s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" } }.mkString("\n").concat(";") http://git-wip-us.apache.org/repos/asf/spark/blob/e0503a72/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala -- diff --git
spark git commit: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible
Repository: spark Updated Branches: refs/heads/master 06df34d35 -> e3536406e [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible ## What changes were proposed in this pull request? `BasicWriteTaskStatsTracker.getFileSize()` to catch `FileNotFoundException`, log info and then return 0 as a file size. This ensures that if a newly created file isn't visible due to the store not always having create consistency, the metric collection doesn't cause the failure. ## How was this patch tested? New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only checks the resilience to missing files, but verifies the existing logic as to how file statistics are gathered. Note that in the current implementation 1. if you call `Tracker..getFinalStats()` more than once, the file size count will increase by size of the last file. This could be fixed by clearing the filename field inside `getFinalStats()` itself. 2. If you pass in an empty or null string to `Tracker.newFile(path)` then IllegalArgumentException is raised, but only in `getFinalStats()`, rather than in `newFile`. There's a test for this behaviour in the new suite, as it verifies that only FNFEs get swallowed. Author: Steve LoughranCloses #18979 from steveloughran/cloud/SPARK-21762-missing-files-in-metrics. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3536406 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3536406 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3536406 Branch: refs/heads/master Commit: e3536406ec6ff65a8b41ba2f2fd40517a760cfd6 Parents: 06df34d Author: Steve Loughran Authored: Fri Oct 13 23:08:17 2017 -0700 Committer: gatorsmile Committed: Fri Oct 13 23:08:17 2017 -0700 -- .../datasources/BasicWriteStatsTracker.scala| 49 - .../BasicWriteTaskStatsTrackerSuite.scala | 220 +++ .../sql/hive/execution/SQLQuerySuite.scala | 8 + 3 files changed, 265 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e3536406/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index b8f7d13..11af0aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -44,20 +47,32 @@ case class BasicWriteTaskStats( * @param hadoopConf */ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) - extends WriteTaskStatsTracker { + extends WriteTaskStatsTracker with Logging { private[this] var numPartitions: Int = 0 private[this] var numFiles: Int = 0 + private[this] var submittedFiles: Int = 0 private[this] var numBytes: Long = 0L private[this] var numRows: Long = 0L - private[this] var curFile: String = null - + private[this] var curFile: Option[String] = None - private def getFileSize(filePath: String): Long = { + /** + * Get the size of the file expected to have been written by a worker. + * @param filePath path to the file + * @return the file size or None if the file was not found. + */ + private def getFileSize(filePath: String): Option[Long] = { val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) -fs.getFileStatus(path).getLen() +try { + Some(fs.getFileStatus(path).getLen()) +} catch { + case e: FileNotFoundException => +// may arise against eventually consistent object stores +logDebug(s"File $path is not yet visible", e) +None +} } @@ -70,12 +85,19 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) } override def newFile(filePath: String): Unit = { -if (numFiles > 0) { - // we assume here that we've finished writing to disk the previous file by now - numBytes += getFileSize(curFile) +statCurrentFile() +curFile =