[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21408


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21408#discussion_r190440692
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging {
 // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
 // which is very slow on some file system (RawLocalFileSystem, 
which is launch a
 // subprocess and parse the stdout).
-val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-  f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-if (f.isSymlink) {
-  lfs.setSymlink(f.getSymlink)
+try {
+  val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+  val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
+f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
+  if (f.isSymlink) {
+lfs.setSymlink(f.getSymlink)
+  }
+  Some(lfs)
+} catch {
+  case _: FileNotFoundException =>
+missingFiles += f.getPath.toString
+None
 }
-lfs
 }
+
+if (missingFiles.nonEmpty) {
+  logWarning(s"The paths [${missingFiles.mkString(", ")}] were not 
found. " +
+"Were they deleted very recently?")
--- End diff --

Now the error messages should look like:

```
the following files were missing during file scan:
  hdfs://.../rel/00171151/input/hyukjin/part-43011-...
  hdfs://.../rel/00171151/input/hyukjin/part-43012-...
  hdfs://.../rel/00171151/input/hyukjin/part-43013-...
  ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21408#discussion_r190439996
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging {
 // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
 // which is very slow on some file system (RawLocalFileSystem, 
which is launch a
 // subprocess and parse the stdout).
-val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-  f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-if (f.isSymlink) {
-  lfs.setSymlink(f.getSymlink)
+try {
+  val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+  val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
+f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
+  if (f.isSymlink) {
+lfs.setSymlink(f.getSymlink)
+  }
+  Some(lfs)
+} catch {
+  case _: FileNotFoundException =>
+missingFiles += f.getPath.toString
+None
 }
-lfs
 }
+
+if (missingFiles.nonEmpty) {
+  logWarning(s"The paths [${missingFiles.mkString(", ")}] were not 
found. " +
+"Were they deleted very recently?")
--- End diff --

no problem


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21408#discussion_r190262119
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging {
 // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
 // which is very slow on some file system (RawLocalFileSystem, 
which is launch a
 // subprocess and parse the stdout).
-val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-  f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-if (f.isSymlink) {
-  lfs.setSymlink(f.getSymlink)
+try {
+  val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+  val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
+f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
+  if (f.isSymlink) {
+lfs.setSymlink(f.getSymlink)
+  }
+  Some(lfs)
+} catch {
+  case _: FileNotFoundException =>
+missingFiles += f.getPath.toString
+None
 }
-lfs
 }
+
+if (missingFiles.nonEmpty) {
+  logWarning(s"The paths [${missingFiles.mkString(", ")}] were not 
found. " +
+"Were they deleted very recently?")
--- End diff --

maybe
```
InMemoryFileIndex: the following files were missing during file scan:
path1
path2
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21408#discussion_r190199737
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging {
 // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
 // which is very slow on some file system (RawLocalFileSystem, 
which is launch a
 // subprocess and parse the stdout).
-val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-  f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-if (f.isSymlink) {
-  lfs.setSymlink(f.getSymlink)
+try {
--- End diff --

only diff here is try and catch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21408#discussion_r190196361
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging {
 // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
 // which is very slow on some file system (RawLocalFileSystem, 
which is launch a
 // subprocess and parse the stdout).
-val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-  f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-if (f.isSymlink) {
-  lfs.setSymlink(f.getSymlink)
+try {
+  val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+  val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
+f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
+  if (f.isSymlink) {
+lfs.setSymlink(f.getSymlink)
+  }
+  Some(lfs)
+} catch {
+  case _: FileNotFoundException =>
+missingFiles += f.getPath.toString
+None
 }
-lfs
 }
+
+if (missingFiles.nonEmpty) {
+  logWarning(s"The paths [${missingFiles.mkString(", ")}] were not 
found. " +
+"Were they deleted very recently?")
--- End diff --

error message looks like this:

```
InMemoryFileIndex: The paths 
[hdfs://hdp265-1.openstacklocal:8020/rel/00171151/input/hyukjin/part-43011-
fd2d682a-ade1-4b0d-9e52-ab5c5d895cc9-c000.csv, ... ] were not found. Were 
they deleted very recently?
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/21408

[SPARK-24364][SS] Prevent InMemoryFileIndex from failing if file path 
doesn't exist

## What changes were proposed in this pull request?

This PR proposes to follow up https://github.com/apache/spark/pull/15153 
and complete SPARK-17599.

`FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the 
file path does not exist. For example see the exception message below:

```
Error occurred while processing: File does not exist: 
/rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
java.io.FileNotFoundException: File does not exist: 
/rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...

org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.(InMemoryFileIndex.scala:67)
at 
org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161)
at 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152)
at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
at 
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
at 
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196)
at 
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
at com.hwx.StreamTest$.main(StreamTest.scala:97)
at com.hwx.StreamTest.main(StreamTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.JavaMainApplicat

[GitHub] spark pull request #21408: [SPARK-24364][SS] Prevent InMemoryFileIndex from ...

2018-05-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21408#discussion_r190196148
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -294,9 +294,12 @@ object InMemoryFileIndex extends Logging {
   if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) 
else allFiles
 }
 
-allLeafStatuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
+val missingFiles = mutable.ArrayBuffer.empty[String]
+val filteredLeafStatuses = allLeafStatuses.filterNot(
+  status => shouldFilterOut(status.getPath.getName))
--- End diff --

I made this var to reduce the diff.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org