[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16481 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r95905076 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -413,10 +413,85 @@ case class DataSource( relation } - /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ - def write( - mode: SaveMode, - data: DataFrame): BaseRelation = { + /** + * Writes the given [[DataFrame]] out in this [[FileFormat]]. + */ + private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { +// Don't glob path for the write path. The contracts here are: +// 1. Only one output path can be specified on the write path; +// 2. Output path must be a legal HDFS style file system path; +// 3. It's OK that the output path doesn't exist yet; +val allPaths = paths ++ caseInsensitiveOptions.get("path") +val outputPath = if (allPaths.length == 1) { + val path = new Path(allPaths.head) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) +} else { + throw new IllegalArgumentException("Expected exactly one path to be specified, but " + +s"got: ${allPaths.mkString(", ")}") +} + +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) + +// If we are appending to a table that already exists, make sure the partitioning matches +// up. If we fail to load the table for whatever reason, ignore the check. +if (mode == SaveMode.Append) { + val existingPartitionColumns = Try { +getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList + }.getOrElse(Seq.empty[String]) + // TODO: Case sensitivity. + val sameColumns = +existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) + if (existingPartitionColumns.nonEmpty && !sameColumns) { +throw new AnalysisException( + s"""Requested partitioning does not match existing partitioning. + |Existing partitioning columns: + | ${existingPartitionColumns.mkString(", ")} + |Requested partitioning columns: + | ${partitionColumns.mkString(", ")} + |""".stripMargin) + } +} + +// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does +// not need to have the query as child, to avoid to analyze an optimized query, +// because InsertIntoHadoopFsRelationCommand will be optimized first. +val partitionAttributes = partitionColumns.map { name => + val plan = data.logicalPlan + plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse { +throw new AnalysisException( + s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]") + }.asInstanceOf[Attribute] +} +val fileIndex = catalogTable.map(_.identifier).map { tableIdent => + sparkSession.table(tableIdent).queryExecution.analyzed.collect { +case LogicalRelation(t: HadoopFsRelation, _, _) => t.location + }.head +} +// For partitioned relation r, r.schema's column ordering can be different from the column +// ordering of data.logicalPlan (partition columns are all moved after data column). This +// will be adjusted within InsertIntoHadoopFsRelation. +val plan = + InsertIntoHadoopFsRelationCommand( +outputPath = outputPath, +staticPartitions = Map.empty, +partitionColumns = partitionAttributes, +bucketSpec = bucketSpec, +fileFormat = format, +options = options, +query = data.logicalPlan, +mode = mode, +catalogTable = catalogTable, +fileIndex = fileIndex) + sparkSession.sessionState.executePlan(plan).toRdd --- End diff -- To the reviewers, the part in `writeInFileFormat` has no code change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apa
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r95724792 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -413,17 +413,22 @@ case class DataSource( relation } - /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ + /** + * Writes the given [[DataFrame]] out to this [[DataSource]]. + * + * @param isForWriteOnly Whether to just write the data without returning a [[BaseRelation]]. + */ def write( --- End diff -- Sure. Will do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r95715805 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -413,17 +413,22 @@ case class DataSource( relation } - /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ + /** + * Writes the given [[DataFrame]] out to this [[DataSource]]. + * + * @param isForWriteOnly Whether to just write the data without returning a [[BaseRelation]]. + */ def write( --- End diff -- let's create a new `write` method that returns `Unit`, and rename this `write` to `writeAndRead`, which should be removed eventually. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user cenyuhai commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r95538748 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -413,17 +413,22 @@ case class DataSource( relation } - /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ + /** + * Writes the given [[DataFrame]] out to this [[DataSource]]. + * + * @param isForWriteOnly Whether to just write the data without returning a [[BaseRelation]]. + */ def write( mode: SaveMode, - data: DataFrame): BaseRelation = { + data: DataFrame, + isForWriteOnly: Boolean = false): Option[BaseRelation] = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } providingClass.newInstance() match { case dataSource: CreatableRelationProvider => -dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) +Some(dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)) --- End diff -- maybe we can set a parameter here, let user to choose true or false --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r95299609 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -413,17 +413,22 @@ case class DataSource( relation } - /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ + /** + * Writes the given [[DataFrame]] out to this [[DataSource]]. + * + * @param isForWriteOnly Whether to just write the data without returning a [[BaseRelation]]. + */ def write( mode: SaveMode, - data: DataFrame): BaseRelation = { + data: DataFrame, + isForWriteOnly: Boolean = false): Option[BaseRelation] = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } providingClass.newInstance() match { case dataSource: CreatableRelationProvider => -dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) +Some(dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)) --- End diff -- it would be really weird if `CreatableRelationProvider.createRelation` can return a relation with different schema from the written `data`. Is it safe to assume the schema won't change? cc @marmbrus @yhuai @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r95070345 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -494,8 +500,13 @@ case class DataSource( catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd -// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. -copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() +if (isForWriteOnly) { + // Exit earlier and return null --- End diff -- Sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r95065596 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -494,8 +500,13 @@ case class DataSource( catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd -// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. -copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() +if (isForWriteOnly) { + // Exit earlier and return null --- End diff -- I'd remove "and return null" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r94886105 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala --- @@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite } private def setupPartitionedDatasourceTable( - tableName: String, dir: File, scale: Int, - clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = { + tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = { spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write .partitionBy("partCol1", "partCol2") .mode("overwrite") .parquet(dir.getAbsolutePath) -if (clearMetricsBeforeCreate) { --- End diff -- Nice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r94886317 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -494,8 +500,13 @@ case class DataSource( catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd -// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. -copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() +if (isForWriteOnly) { + // Exit earlier and return null + null --- End diff -- Maybe we can change it to return an option? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r94879343 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -494,8 +500,13 @@ case class DataSource( catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd -// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. -copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() +if (isForWriteOnly) { + // Exit earlier and return null + null --- End diff -- I do not know whether returning null is ok here. This is based on a [similar early-exit solution](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L178) used in `getOrInferFileFormatSchema`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r94879140 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala --- @@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite }) executorPool.shutdown() executorPool.awaitTermination(30, TimeUnit.SECONDS) - // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and - // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect, - // only one thread can really do the build, so the listing job count is 2, the other - // one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2 --- End diff -- This comment is not accurate. The extra counts are from the save API call in `setupPartitionedHiveTable`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/16481 [SPARK-19092] [SQL] Save() API of DataFrameWriter should not scan all the saved files ### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: https://github.com/apache/spark/pull/16090 ### How was this patch tested? Updated the existing test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark saveFileScan Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16481 commit 5d38f09f47a767a342a0a8219c63efa2943b5d1f Author: gatorsmile Date: 2017-01-05T23:53:55Z fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org