tgravescs commented on a change in pull request #30714:
URL: https://github.com/apache/spark/pull/30714#discussion_r566154624



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -66,14 +67,65 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
   private def getFileSize(filePath: String): Option[Long] = {
     val path = new Path(filePath)
     val fs = path.getFileSystem(hadoopConf)
+    getFileSize(fs, path)
+  }
+
+  /**
+   * Get the size of the file expected to have been written by a worker.
+   * This supports the XAttr in HADOOP-17414 when the "magic committer" adds
+   * a custom HTTP header to the a zero byte marker.
+   * If the output file as returned by getFileStatus > 0 then the length if
+   * returned. For zero-byte files, the (optional) Hadoop FS API getXAttr() is
+   * invoked. If a parseable, non-negative length can be retrieved, this
+   * is returned instead of the length.
+   * @return the file size or None if the file was not found.
+   */
+  private [datasources] def getFileSize(fs: FileSystem, path: Path): 
Option[Long] = {
+    // the normal file status probe.
     try {
-      Some(fs.getFileStatus(path).getLen())
+      val len = fs.getFileStatus(path).getLen
+      if (len > 0) {
+        return Some(len)
+      }
     } catch {
       case e: FileNotFoundException =>
-        // may arise against eventually consistent object stores
+        // may arise against eventually consistent object stores.
         logDebug(s"File $path is not yet visible", e)
-        None
+        return None
+    }
+
+    // Output File Size is 0. Look to see if it has an attribute
+    // declaring a future-file-length.
+    // Failure of API call, parsing, invalid value all return the
+    // 0 byte length.
+
+    var len = 0L
+    try {
+      val attr = fs.getXAttr(path, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR)
+      if (attr != null && attr.nonEmpty) {
+        val str = new String(attr, StandardCharsets.UTF_8)
+        logInfo(s"File Length statistics for $path retrieved from XAttr: $str")
+        // a non-empty header was found. parse to a long via the java class
+        val l = java.lang.Long.parseLong(str)
+        if (l > 0) {
+          len = l
+        } else {
+          logDebug("Ignoring negative value in Xattr file length")
+        }
+      }
+    } catch {
+      case e: NumberFormatException =>
+        // warn but don't dump the whole stack
+        logInfo(s"Failed to parse" +

Review comment:
       do we want this warn instead or perhaps just extend saying file stats 
may be wrong?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to