[GitHub] spark pull request #21554: [SPARK-24546][SQL] InsertIntoDataSourceCommand ma...
Github user zheh12 closed the pull request at: https://github.com/apache/spark/pull/21554 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21554: [SPARK-24546][SQL] InsertIntoDataSourceCommand make data...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21554 I know this sql standard. But I wonder If I use `query.schema`, how it will affect the logic of by-position. I think we should let datasource implement has the ability to decide use by-position or by-name. As the implement of kudu-spark, it decides to use by-name with this map ``` val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) => sparkIdx -> table.getSchema.getColumnIndex(field.name) }) ``` But now we give a wrong shcmea, it always be something like (0,0), (1,1), it always be by-position. But I think this code want to be by-name. Beacuse kudu schema must put primary key first, so it always has different order from other table schema. When create dataframe with `query.schema`, there will no error by-position, but add the possibility to let datasource to choose by-name or by-position. But now the datasource must be by-position. And more, As a developer, I choose to implement InsertableRelation ``` trait InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit } ``` I have the possibility get the wrong schema, and I can't find nothing wrong with the dataframe. @cloud-fan What I think is right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21554: [SPARK-24546][SQL] InsertIntoDataSourceCommand make data...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21554 cc @cloud-fan @jiangxb1987, please give me some advise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21554: [SPARK-24546] InsertIntoDataSourceCommand make da...
GitHub user zheh12 opened a pull request: https://github.com/apache/spark/pull/21554 [SPARK-24546] InsertIntoDataSourceCommand make data frame with wrong schema when use kudu. ## What changes were proposed in this pull request? I have a hdfs table ``` hdfs_table(a int,b int,c int) ``` then I have a kudu table ``` kudu_table(b int primary key, a int, c int) ``` I want to insert kudu_table ``` insert into kudu_table select * from hdfs_table ``` But the data in kudu is misordered. I think the reason is the line code ``` val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) ``` I think the code no check and can break the law > the row data must with the right schema When the logicalRelation like kudu with different order schema, we should let the kudu code to process the convert as the kudu do like this. ``` val table: KuduTable = syncClient.openTable(tableName) val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) => sparkIdx -> table.getSchema.getColumnIndex(field.name) }) ``` So I suggest create data frame with query schema, and write some convert code outside spark sql. ## How was this patch tested? I test with spark-2.3 and kudu You can merge this pull request into a Git repository by running: $ git pull https://github.com/zheh12/spark SPARK-24546 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21554.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21554 commit bc92fcfbd226468960574c487e8be48bc58bb67d Author: yangz Date: 2018-06-13T10:44:46Z [SPARK-24546] InsertIntoDataSourceCommand make data frame with wrong schema --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r189422931 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -163,6 +170,15 @@ class HadoopMapReduceCommitProtocol( } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { +// first delete the should delete special file +for (fs <- pathsToDelete.keys) { + for (path <- pathsToDelete(fs)) { +if (fs.exists(path)) { + fs.delete(path, true) --- End diff -- I think we should not delete the data when the task is aborted. The semantics of `descriptionWithJob` should be to delete the data when the `Job` is commited. I change code for handling exceptions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r188211717 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -120,7 +120,8 @@ abstract class FileCommitProtocol { * Specifies that a file should be deleted with the commit of this job. The default * implementation deletes the file immediately. */ - def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { + def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean, --- End diff -- I will remove the `recursive` parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21286 I think I may not have described this issue clearly. First of all,the scene of the problem is this. When multiple applications simultaneously append data to the same parquet datasource table. They will run simultaneously and share the same output directory. ``` FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) ``` `ouputSepc` is the output table directory `skip_dir/tab1/` `skip_dir/tab/_temporary` will be created as temporary dir. But once one Job is successfully committed, it will run cleanupJob ``` Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); fs.delete(pendingJobAttemptsPath, true); ``` The pendingJobAttemptsPath is `skip_dir/tab1/_temporary` ``` Private Path getPendingJobAttemptsPath() { Â Â Â Â Return getPendingJobAttemptsPath(getOutputPath()); } Private static Path getPendingJobAttemptsPath(Path out) { Â Â Â Â Return new Path(out, PENDING_DIR_NAME); } Public static final String PENDING_DIR_NAME = "_temporary"; ``` After the job is committed, `skip_dir/tab1/_temporary` will be deleted. Then when other jobs attempt to commit, an error will be reported. Meanwhile, due to all applications share the same app appempt id, they write temporary data to the same temporary dir `skip_dir/tab1/_temporary/0`. Data committed by the successful application is also corrupted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21286 Thanks @cloud-fan @steveloughran for your reply, I will look more detail on this problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r187960677 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -120,7 +120,8 @@ abstract class FileCommitProtocol { * Specifies that a file should be deleted with the commit of this job. The default * implementation deletes the file immediately. */ - def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { + def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean, --- End diff -- In the current situation we can delete it, but I feel it better to use a default value true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r187959698 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -163,6 +169,12 @@ class HadoopMapReduceCommitProtocol( } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { +// first delete the should delete special file +val committerFs = jobContext.getWorkingDirectory.getFileSystem(jobContext.getConfiguration) --- End diff -- I change my code. I now record every `FileSystem` will delete the path with a map structure. And Don't assume that they will use the same `FileSystem`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r187930560 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -163,6 +169,12 @@ class HadoopMapReduceCommitProtocol( } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { +// first delete the should delete special file +val committerFs = jobContext.getWorkingDirectory.getFileSystem(jobContext.getConfiguration) --- End diff -- StagingDir is not always be valid hadoop path, but the JobContext work dir always be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r187929156 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -898,4 +898,12 @@ object DDLUtils { "Cannot overwrite a path that is also being read from.") } } + + def verifyReadPath(query: LogicalPlan, outputPath: Path): Boolean = { --- End diff -- isInReadPath or inReadPath or isReadPath better? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21286 I think the Hadoop design does not allow two jobs to share the same output folder. Hadoop has a related patch that can partially solve this problem. You can configure the parameters to not clean up the _temporary directory. But I think this is not a good solution. [MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup failure during commitJob.](https://issues.apache.org/jira/browse/MAPREDUCE-6478?attachmentSortBy=fileName) For this problem, we'd better use different temporary output directories for different jobs, and then copy the files. However, the current implementation breaks some unit tests. There are two ways to fix it. 1. Add the check of presence of tempDir in `HadoopMapReduceCommitProtocal.commitJob`, but this requires an external set `FileOutputFormat.setOutputPath(job, s".temp-${commiter.getJobId()}")` 2. Another approach is that we enable the tempDir directory for all `HadoopMapReduceCommitProtocal`. Â The shield tempDir setting problem, but for all jobs will be one more files move. cc @cloud-fan. Which do you think is better? Please give me some advice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21286 relates to #21257 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21286 cc @cloud-fan @jiangxb1987 Is there some drawbacks for this idea? Please give some advice when you have time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21286: [SPARK-24194] HadoopFsRelation cannot overwrite a...
GitHub user zheh12 opened a pull request: https://github.com/apache/spark/pull/21286 [SPARK-24194] HadoopFsRelation cannot overwrite a path that is also b⦠## What changes were proposed in this pull request? When there are multiple tasks at the same time append a `HadoopFsRelation`, there will be an error, there are the following two errors: 1. A task will succeed, but the data will be wrong and more data than excepted will appear 2. Other tasks will fail with `java.io.FileNotFoundException: Failed to get file status skip_dir/_temporary/0` The main reason for this problem is because multiple job will use the same `_temporary` directory. So the core idea of this `PR` is to create a different temporary directory with jobId for the different Job in the `output` folder , so that conflicts can be avoided. ## How was this patch tested? I manually tested. But I don't know how to write a unit test for this situation. Please help me. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zheh12/spark SPARK-24238 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21286.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21286 commit b676a36af110b0b7d7dfc47ab292d09c441f6a0f Author: yangz Date: 2018-05-10T01:46:49Z [SPARK-24194] HadoopFsRelation cannot overwrite a path that is also being read from --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21257 cc @cloud-fan, Jenkins has some error, please help me retest, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21257 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r186936032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) -if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + -s"directory $staticPrefixPath prior to writing to it") + +// check if delete the dir or just sub files +if (fs.exists(staticPrefixPath)) { + // check if is he table root, and record the file to delete + if (staticPartitionPrefix.isEmpty) { +val files = fs.listFiles(staticPrefixPath, false) +while (files.hasNext) { + val file = files.next() + if (!committer.deleteWithJob(fs, file.getPath, true)) { --- End diff -- That's a good idea. I change my code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r186915569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) -if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + -s"directory $staticPrefixPath prior to writing to it") + +// check if delete the dir or just sub files +if (fs.exists(staticPrefixPath)) { + // check if is he table root, and record the file to delete + if (staticPartitionPrefix.isEmpty) { +val files = fs.listFiles(staticPrefixPath, false) +while (files.hasNext) { + val file = files.next() + if (!committer.deleteWithJob(fs, file.getPath, true)) { --- End diff -- The key is that the data is already in the `output` directory before committing job, and we can't delete the `output` directory anymore. We overloaded `FileCommitProtocol` in the `HadoopMapReduceCommitProtocol` with the `deleteWithJob` method. Now it will not delete the file immediately, but it will wait until the entire job is committed. We did delete the files with committed the job, but the temporary output files were generated when the task was started. These temporary output files are in the `output` directory. And the data will be move out to the `output` directory. After the job starts, there is no safe time to delete the entire `output` directory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r186719888 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) -if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + -s"directory $staticPrefixPath prior to writing to it") + +// check if delete the dir or just sub files +if (fs.exists(staticPrefixPath)) { + // check if is he table root, and record the file to delete + if (staticPartitionPrefix.isEmpty) { +val files = fs.listFiles(staticPrefixPath, false) +while (files.hasNext) { + val file = files.next() + if (!committer.deleteWithJob(fs, file.getPath, true)) { --- End diff -- If I do this, when the job is committed, it will delete the entire `output` directory. And there will be no data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r186609102 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -235,4 +247,20 @@ class HadoopMapReduceCommitProtocol( tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) } } + + /** + * now just record the file to be delete + */ + override def deleteWithJob(fs: FileSystem, --- End diff -- I have changed this code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r186608143 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) -if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + -s"directory $staticPrefixPath prior to writing to it") + +// check if delete the dir or just sub files +if (fs.exists(staticPrefixPath)) { + // check if is he table root, and record the file to delete + if (staticPartitionPrefix.isEmpty) { +val files = fs.listFiles(staticPrefixPath, false) +while (files.hasNext) { + val file = files.next() + if (!committer.deleteWithJob(fs, file.getPath, true)) { --- End diff -- We choose to postpone deletion. Whether or not `output` is the same as `input`, now the `_temporary` directory is created in the `output` directory before deletion, so that it is not possible to delete the root directory directly. The original implementation was able to delete the root directory directly because it was deleted before the job was created, and then the root directory was rebuilt. Then the `_temporary` directory was created. Failure of any `task` in `job` in the original implementation will result in the loss of `output` data. I can't figure out how to separate the two situations. Do you have any good ideas? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r186603145 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) -if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + -s"directory $staticPrefixPath prior to writing to it") + +// check if delete the dir or just sub files +if (fs.exists(staticPrefixPath)) { + // check if is he table root, and record the file to delete + if (staticPartitionPrefix.isEmpty) { +val files = fs.listFiles(staticPrefixPath, false) +while (files.hasNext) { + val file = files.next() + if (!committer.deleteWithJob(fs, file.getPath, true)) { --- End diff -- 1. From the code point of view, the current implementation is `deleteMatchingPartitions` happend only if `overwrite` is specified. 2. Using `dynamicPartitionOverwrite` will not solve this problem,because it will also generate a `.stage` directory under the table root directory. We still need to record all the files we want to delete, but we cannot directly delete the root directories. The dynamic partition overwrite is actually recording all the partitions that need to be deleted and then deleted one by one. And the entire table `overwrite` deletes all the data of the entire directory, it needs to record all deleted partition directory files,so in fact the implementation of the code is similar with `dynamicPartitionOverwrite` . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...
Github user zheh12 commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r186599760 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) -if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + -s"directory $staticPrefixPath prior to writing to it") + +// check if delete the dir or just sub files +if (fs.exists(staticPrefixPath)) { + // check if is he table root, and record the file to delete + if (staticPartitionPrefix.isEmpty) { +val files = fs.listFiles(staticPrefixPath, false) +while (files.hasNext) { + val file = files.next() + if (!committer.deleteWithJob(fs, file.getPath, true)) { --- End diff -- First of all, if it is the root directory of the table, I must record all the files in the directory, and wait until the job is commited to delete. Because the `_temporary` of the entire job is also in the directory, I cannot directly delete the entire directory. Second, when we record the files that need to be deleted, we just list the files in the root directory non-recursively. Under normal circumstances, the number of files in the first-level directory of the partition table will not be too much. In the end, this will certainly be slower than directly deleting the entire directory, but under the current implementation, we cannot directly delete the entire table directory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...
Github user zheh12 commented on the issue: https://github.com/apache/spark/pull/21257 cc @rxin @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21257: [SPARK-24194] HadoopFsRelation cannot overwrite a...
GitHub user zheh12 opened a pull request: https://github.com/apache/spark/pull/21257 [SPARK-24194] HadoopFsRelation cannot overwrite a path that is also b⦠## What changes were proposed in this pull request? When insert overwrite in a parquet table. There will be a error check ``` if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath) ``` But we can do this for a hive table. The reason why we can't overwrite a **HadoopFsRelation** with output same as input is we delete the output path first. I think we can fix this with latter delete, just mark path should be deleted after the job commit. ## How was this patch tested? I change the test code **InsertSuite** and **MetastoreDataSourceSuite**. They now are input and output table can be same test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zheh12/spark SPARK-24194 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21257.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21257 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org