Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/5792#discussion_r31365223
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -219,6 +222,52 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
}
}
+ override def writeEventLogs(
+ appId: String,
+ attemptId: Option[String],
+ outputStream: OutputStream): Unit = {
+
+ applications.get(appId) match {
+ case Some(appInfo) =>
+ val dirsToClear = new mutable.ArrayBuffer[File]()
+ try {
+ // If no attempt is specified, or there is no attemptId for
attempts, return all attempts
+ val pathsToZip = appInfo.attempts.filter { attempt =>
+ attempt.attemptId.isEmpty || attemptId.isEmpty ||
attempt.attemptId.get == attemptId.get
+ }.map { attempt =>
+ val logPath = new Path(logDir, attempt.logPath)
+ if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
+ val localDir = Utils.createTempDir()
+ Utils.chmod700(localDir)
+ dirsToClear += localDir
+ val outputFile = new File(localDir, logPath.getName)
+ val outputStream = new FileOutputStream(outputFile)
+ val files = fs.listFiles(logPath, false)
+ val paths = new mutable.ArrayBuffer[Path]()
+ while (files.hasNext) {
+ paths += files.next().getPath
+ }
+ Utils.zipFilesToStream(paths, hadoopConf, outputStream)
+ new Path(outputFile.toURI)
--- End diff --
I'm a little confused on how this works when you have multiple attempts &
legacy log files. Could you add a test for that case?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]