This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 4e80b3a09407 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log 4e80b3a09407 is described below commit 4e80b3a09407042f7c596963dcb4fc59e68755ab Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Sat Dec 9 15:20:55 2023 -0800 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log ### What changes were proposed in this pull request? This patch updates the document of `CheckpointFileManager.list` method to reflect the fact it is used to return both files and directories to reduce confusion. For the usage like `HDFSMetadataLog` where it assumes returned file status by `list` are all files, we add a filter there to avoid confusing error. ### Why are the changes needed? `HDFSMetadataLog` takes a metadata path as parameter. When it goes to retrieves all batches metadata, it calls `CheckpointFileManager.list` to get all files under the metadata path. However, currently all implementations of `CheckpointFileManager.list` returns all files/directories under the given path. So if there is a dictionary with name of batch number (a long value), the directory will be returned too and cause trouble when `HDFSMetadataLog` goes to read it. Actually, `CheckpointFileManager.list` method clearly defines that it lists the "files" in a path. That's being said, current implementations don't follow the doc. We tried to make `list` method implementations only return files but some usage (state metadata) of `list` method already break the assumption and they use dictionaries returned by `list` method. So we simply update `list` method document to explicitly define it returns both files/dictionaries. We add a filter in `HDFSMetad [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44272 from viirya/fix_metadatalog. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 75805f07f5caeb01104a7352b02790d03a043ded) Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 28a8b181e96d4ce71e2f9888910214d14a859b7d) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/execution/streaming/CheckpointFileManager.scala | 4 ++-- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 ++ .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 ++++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 013efd3c7bae..b2a3b8d73d4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -65,10 +65,10 @@ trait CheckpointFileManager { /** Open a file for reading, or throw exception if it does not exist. */ def open(path: Path): FSDataInputStream - /** List the files in a path that match a filter. */ + /** List the files/directories in a path that match a filter. */ def list(path: Path, filter: PathFilter): Array[FileStatus] - /** List all the files in a path. */ + /** List all the files/directories in a path. */ def list(path: Path): Array[FileStatus] = { list(path, (_: Path) => true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 2b0172bb9555..9a811db679d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -325,6 +325,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: /** List the available batches on file system. */ protected def listBatches: Array[Long] = { val batchIds = fileManager.list(metadataPath, batchFilesFilter) + // Batches must be files + .filter(f => f.isFile) .map(f => pathToBatchId(f.getPath)) ++ // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to // elimiate the race condition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 980d532dd477..08f245135f58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -33,6 +33,18 @@ class HDFSMetadataLogSuite extends SharedSparkSession { private implicit def toOption[A](a: A): Option[A] = Option(a) + test("SPARK-46339: Directory with number name should not be treated as metadata log") { + withTempDir { temp => + val dir = new File(temp, "dir") + val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath) + assert(metadataLog.metadataPath.toString.endsWith("/dir")) + + // Create a directory with batch id 0 + new File(dir, "0").mkdir() + assert(metadataLog.getLatest() === None) + } + } + test("HDFSMetadataLog: basic") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org