Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6935#discussion_r47551842
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -385,24 +448,48 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
}
}
- if (newAttempts.isEmpty) {
- return
+ updateApplicationsWithNewAttempts(newAttempts)
+ }
+
+ /**
+ * Merge in all new attempts with those in [[applications]], updating
the [[applications]]
+ * field afterwards. It _must not_ be executed concurrently, else
attempt information may
+ * be lost.
+ * @param newAttempts a possibly empty list of new attempts
+ */
+ private def updateApplicationsWithNewAttempts(newAttempts:
Iterable[FsApplicationAttemptInfo])
+ : Unit = {
+ if (newAttempts.nonEmpty) {
+ applications = mergeAttempts(newAttempts, applications)
}
+ }
+
+ /**
+ * Build a map containing all apps that contain new attempts. The app
information in this map
+ * contains both the new app attempt, and those that were already loaded
in the existing apps
+ * map. If an attempt has been updated, it replaces the old attempt in
the list.
+ * The ordering is maintained
+ * @param newAttempts new attempt list
+ * @param current the current attempt list
+ * @return the updated list
+ */
+ private def mergeAttempts(newAttempts:
Iterable[FsApplicationAttemptInfo],
--- End diff --
nit: first arg on its own line
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]