Repository: spark Updated Branches: refs/heads/branch-1.4 b836bac3f -> 451c8722a
[SPARK-8406] [SQL] Backports SPARK-8406 and PR #6864 to branch-1.4 Author: Cheng Lian <l...@databricks.com> Closes #6932 from liancheng/spark-8406-for-1.4 and squashes the following commits: a0168fe [Cheng Lian] Backports SPARK-8406 and PR #6864 to branch-1.4 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/451c8722 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/451c8722 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/451c8722 Branch: refs/heads/branch-1.4 Commit: 451c8722afea83e8e8f11c438469eea10e5acf4c Parents: b836bac Author: Cheng Lian <l...@databricks.com> Authored: Mon Jun 22 10:04:29 2015 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Jun 22 10:04:29 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/parquet/newParquet.scala | 41 ++------------ .../org/apache/spark/sql/sources/commands.scala | 59 ++++++++++++++++---- .../spark/sql/hive/orc/OrcFileOperator.scala | 7 ++- .../apache/spark/sql/hive/orc/OrcRelation.scala | 3 +- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../spark/sql/hive/orc/OrcSourceSuite.scala | 22 ++++---- .../spark/sql/sources/SimpleTextRelation.scala | 4 +- .../sql/sources/hadoopFsRelationSuites.scala | 39 +++++++++++-- 8 files changed, 110 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 3328e6f..abf9614 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -61,50 +61,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext extends OutputWriter { private val recordWriter: RecordWriter[Void, Row] = { - val conf = context.getConfiguration val outputFormat = { - // When appending new Parquet files to an existing Parquet file directory, to avoid - // overwriting existing data files, we need to find out the max task ID encoded in these data - // file names. - // TODO Make this snippet a utility function for other data source developers - val maxExistingTaskId = { - // Note that `path` may point to a temporary location. Here we retrieve the real - // destination path from the configuration - val outputPath = new Path(conf.get("spark.sql.sources.output.path")) - val fs = outputPath.getFileSystem(conf) - - if (fs.exists(outputPath)) { - // Pattern used to match task ID in part file names, e.g.: - // - // part-r-00001.gz.parquet - // ^~~~~ - val partFilePattern = """part-.-(\d{1,}).*""".r - - fs.listStatus(outputPath).map(_.getPath.getName).map { - case partFilePattern(id) => id.toInt - case name if name.startsWith("_") => 0 - case name if name.startsWith(".") => 0 - case name => throw new AnalysisException( - s"Trying to write Parquet files to directory $outputPath, " + - s"but found items with illegal name '$name'.") - }.reduceOption(_ max _).getOrElse(0) - } else { - 0 - } - } - new ParquetOutputFormat[Row]() { // Here we override `getDefaultWorkFile` for two reasons: // - // 1. To allow appending. We need to generate output file name based on the max available - // task ID computed above. + // 1. To allow appending. We need to generate unique output file names to avoid + // overwriting existing files (either exist before the write job, or are just written + // by other tasks within the same write job). // // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all // partitions in the case of dynamic partitioning. override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1 - new Path(path, f"part-r-$split%05d$extension") + val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") + val split = context.getTaskAttemptID.getTaskID.getId + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index eb4e8f8..9a75dd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import java.util.Date +import java.util.{UUID, Date} import scala.collection.mutable @@ -59,6 +59,28 @@ private[sql] case class InsertIntoDataSource( } } +/** + * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. + * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a + * single write job, and owns a UUID that identifies this job. Each concrete implementation of + * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for + * each task output file. This UUID is passed to executor side via a property named + * `spark.sql.sources.writeJobUUID`. + * + * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]] + * are used to write to normal tables and tables with dynamic partitions. + * + * Basic work flow of this command is: + * + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + */ private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, @@ -271,6 +293,13 @@ private[sql] abstract class BaseWriterContainer( protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job)) + // This UUID is used to avoid output file name collision between different appending write jobs. + // These jobs may belong to different SparkContext instances. Concrete data source implementations + // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`). + // The reason why this ID is used to identify a job rather than a single task output file is + // that, speculative tasks must generate the same output file name as the original task. + private val uniqueWriteJobId = UUID.randomUUID() + // This is only used on driver side. @transient private val jobContext: JobContext = job @@ -298,6 +327,11 @@ private[sql] abstract class BaseWriterContainer( setupIDs(0, 0, 0) setupConf() + // This UUID is sent to executor side together with the serialized `Configuration` object within + // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate + // unique task output files. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) + // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. @@ -425,15 +459,16 @@ private[sql] class DefaultWriterContainer( assert(writer != null, "OutputWriter instance should have been initialized") writer.close() super.commitTask() - } catch { - case cause: Throwable => - super.abortTask() - throw new RuntimeException("Failed to commit task", cause) + } catch { case cause: Throwable => + // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will + // cause `abortTask()` to be invoked. + throw new RuntimeException("Failed to commit task", cause) } } override def abortTask(): Unit = { try { + // It's possible that the task fails before `writer` gets initialized if (writer != null) { writer.close() } @@ -477,21 +512,25 @@ private[sql] class DynamicPartitionWriterContainer( }) } - override def commitTask(): Unit = { - try { + private def clearOutputWriters(): Unit = { + if (outputWriters.nonEmpty) { outputWriters.values.foreach(_.close()) outputWriters.clear() + } + } + + override def commitTask(): Unit = { + try { + clearOutputWriters() super.commitTask() } catch { case cause: Throwable => - super.abortTask() throw new RuntimeException("Failed to commit task", cause) } } override def abortTask(): Unit = { try { - outputWriters.values.foreach(_.close()) - outputWriters.clear() + clearOutputWriters() } finally { super.abortTask() } http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 1e51173..e5e92e6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -33,7 +33,7 @@ private[orc] object OrcFileOperator extends Logging{ val fspath = new Path(pathStr) val fs = fspath.getFileSystem(conf) val orcFiles = listOrcFiles(pathStr, conf) - + logDebug(s"Creating ORC Reader from ${orcFiles.head}") // TODO Need to consider all files when schema evolution is taken into account. OrcFile.createReader(fs, orcFiles.head) } @@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{ val reader = getFileReader(path, conf) val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] val schema = readerInspector.getTypeName + logDebug(s"Reading schema from file $path, got Hive schema string: $schema") HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType] } @@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{ def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) - val path = origPath.makeQualified(fs) + val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath) .filterNot(_.isDir) .map(_.getPath) .filterNot(_.getName.startsWith("_")) .filterNot(_.getName.startsWith(".")) - if (paths == null || paths.size == 0) { + if (paths == null || paths.isEmpty) { throw new IllegalArgumentException( s"orcFileOperator: path $path does not have valid orc files matching the pattern") } http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 4509428..101f2ff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -104,8 +104,9 @@ private[orc] class OrcOutputWriter( recordWriterInstantiated = true val conf = context.getConfiguration + val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") val partition = context.getTaskAttemptID.getTaskID.getId - val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc" + val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc" new OrcOutputFormat().getRecordWriter( new Path(path, filename).getFileSystem(conf), http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 7c7afc8..e337744 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ object TestHive extends TestHiveContext( new SparkContext( - "local[2]", + "local[32]", "TestSQLContext", new SparkConf() .set("spark.sql.test", "") http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 82e08ca..433ea9b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -44,7 +44,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { import org.apache.spark.sql.hive.test.TestHive.implicits._ sparkContext - .makeRDD(1 to 10) + .makeRDD(1 to 100) .map(i => OrcData(i, s"part-$i")) .toDF() .registerTempTable(s"orc_temp_table") @@ -70,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { } test("create temporary orc table") { - checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10)) + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100)) checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 10).map(i => Row(i, s"part-$i"))) + (1 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT * FROM normal_orc_source where intField > 5"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), - (1 to 10).map(i => Row(1, s"part-$i"))) + (1 to 100).map(i => Row(1, s"part-$i"))) } test("create temporary orc table as") { - checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10)) + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100)) checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 10).map(i => Row(i, s"part-$i"))) + (1 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT * FROM normal_orc_source WHERE intField > 5"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), - (1 to 10).map(i => Row(1, s"part-$i"))) + (1 to 100).map(i => Row(1, s"part-$i"))) } test("appending insert") { @@ -106,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i => + (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i => Seq.fill(2)(Row(i, s"part-$i")) }) } @@ -119,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("SELECT * FROM normal_orc_as_source"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 0f959b3..5d7cd16 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW numberFormat.setGroupingUsed(false) override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") val split = context.getTaskAttemptID.getTaskID.getId val name = FileOutputFormat.getOutputName(context) - new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}") + new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId") } } @@ -156,6 +157,7 @@ class CommitFailureTestRelation( context: TaskAttemptContext): OutputWriter = { new SimpleTextOutputWriter(path, context) { override def close(): Unit = { + super.close() sys.error("Intentional task commitment failure for testing purpose.") } } http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 57c0525..5b5ed5a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -22,12 +22,12 @@ import java.io.File import com.google.common.io.Files import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.{SparkException, SparkFunSuite} abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { override val sqlContext: SQLContext = TestHive @@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { import sqlContext._ import sqlContext.implicits._ - val dataSourceName = classOf[SimpleTextSource].getCanonicalName + val dataSourceName: String val dataSchema = StructType( @@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { checkAnswer(table("t"), df.select('b, 'c, 'a).collect()) } } + + // NOTE: This test suite is not super deterministic. On nodes with only relatively few cores + // (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or + // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this + // requirement. We probably want to move this test case to spark-integration-tests or spark-perf + // later. + test("SPARK-8406: Avoids name collision while writing Parquet files") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext + .range(10000) + .repartition(250) + .write + .mode(SaveMode.Overwrite) + .format(dataSourceName) + .save(path) + + assertResult(10000) { + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json) + .load(path) + .count() + } + } + } } class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { @@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { } class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils { - import TestHive.implicits._ - override val sqlContext = TestHive + // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose. val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName test("SPARK-7684: commitTask() failure should fallback to abortTask()") { withTempPath { file => - val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b") + // Here we coalesce partition number to 1 to ensure that only a single task is issued. This + // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary` + // directory while committing/aborting the job. See SPARK-8513 for more details. + val df = sqlContext.range(0, 10).coalesce(1) intercept[SparkException] { df.write.format(dataSourceName).save(file.getCanonicalPath) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org