[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21408 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21408#discussion_r190440692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging { // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). -val locations = fs.getFileBlockLocations(f, 0, f.getLen) -val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) -if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) +try { + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, +f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { +lfs.setSymlink(f.getSymlink) + } + Some(lfs) +} catch { + case _: FileNotFoundException => +missingFiles += f.getPath.toString +None } -lfs } + +if (missingFiles.nonEmpty) { + logWarning(s"The paths [${missingFiles.mkString(", ")}] were not found. " + +"Were they deleted very recently?") --- End diff -- Now the error messages should look like: ``` the following files were missing during file scan: hdfs://.../rel/00171151/input/hyukjin/part-43011-... hdfs://.../rel/00171151/input/hyukjin/part-43012-... hdfs://.../rel/00171151/input/hyukjin/part-43013-... ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21408#discussion_r190439996 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging { // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). -val locations = fs.getFileBlockLocations(f, 0, f.getLen) -val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) -if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) +try { + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, +f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { +lfs.setSymlink(f.getSymlink) + } + Some(lfs) +} catch { + case _: FileNotFoundException => +missingFiles += f.getPath.toString +None } -lfs } + +if (missingFiles.nonEmpty) { + logWarning(s"The paths [${missingFiles.mkString(", ")}] were not found. " + +"Were they deleted very recently?") --- End diff -- no problem --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21408#discussion_r190262119 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging { // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). -val locations = fs.getFileBlockLocations(f, 0, f.getLen) -val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) -if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) +try { + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, +f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { +lfs.setSymlink(f.getSymlink) + } + Some(lfs) +} catch { + case _: FileNotFoundException => +missingFiles += f.getPath.toString +None } -lfs } + +if (missingFiles.nonEmpty) { + logWarning(s"The paths [${missingFiles.mkString(", ")}] were not found. " + +"Were they deleted very recently?") --- End diff -- maybe ``` InMemoryFileIndex: the following files were missing during file scan: path1 path2 ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21408#discussion_r190199737 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging { // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). -val locations = fs.getFileBlockLocations(f, 0, f.getLen) -val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) -if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) +try { --- End diff -- only diff here is try and catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21408#discussion_r190196361 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging { // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). -val locations = fs.getFileBlockLocations(f, 0, f.getLen) -val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) -if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) +try { + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, +f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { +lfs.setSymlink(f.getSymlink) + } + Some(lfs) +} catch { + case _: FileNotFoundException => +missingFiles += f.getPath.toString +None } -lfs } + +if (missingFiles.nonEmpty) { + logWarning(s"The paths [${missingFiles.mkString(", ")}] were not found. " + +"Were they deleted very recently?") --- End diff -- error message looks like this: ``` InMemoryFileIndex: The paths [hdfs://hdp265-1.openstacklocal:8020/rel/00171151/input/hyukjin/part-43011- fd2d682a-ade1-4b0d-9e52-ab5c5d895cc9-c000.csv, ... ] were not found. Were they deleted very recently? ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/21408 [SPARK-24364][SS] Prevent InMemoryFileIndex from failing if file path doesn't exist ## What changes were proposed in this pull request? This PR proposes to follow up https://github.com/apache/spark/pull/15153 and complete SPARK-17599. `FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the file path does not exist. For example see the exception message below: ``` Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.(InMemoryFileIndex.scala:67) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161) at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206) at com.hwx.StreamTest$.main(StreamTest.scala:97) at com.hwx.StreamTest.main(StreamTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplicat
[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21408#discussion_r190196148 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -294,9 +294,12 @@ object InMemoryFileIndex extends Logging { if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } -allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { +val missingFiles = mutable.ArrayBuffer.empty[String] +val filteredLeafStatuses = allLeafStatuses.filterNot( + status => shouldFilterOut(status.getPath.getName)) --- End diff -- I made this var to reduce the diff. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org