vanzin commented on a change in pull request #25670: [SPARK-28869][CORE] Roll 
over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325356209
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##########
 @@ -435,27 +435,26 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
       logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
 
       val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
-        .filter { entry =>
-          !entry.isDirectory() &&
-            // FsHistoryProvider used to generate a hidden file which can't be 
read.  Accidentally
-            // reading a garbage file is safe, but we would log an error which 
can be scary to
-            // the end-user.
-            !entry.getPath().getName().startsWith(".") &&
-            !isBlacklisted(entry.getPath)
-        }
-        .filter { entry =>
+        .filter(entry => !isBlacklisted(entry.getPath))
+        .flatMap(entry => EventLogFileReader.getEventLogReader(fs, entry))
+        .filter { reader =>
           try {
-            val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+            val info = listing.read(classOf[LogInfo], 
reader.rootPath.toString())
 
             if (info.appId.isDefined) {
               // If the SHS view has a valid application, update the time the 
file was last seen so
               // that the entry is not deleted from the SHS listing. Also 
update the file size, in
               // case the code below decides we don't need to parse the log.
-              listing.write(info.copy(lastProcessed = newLastScanTime, 
fileSize = entry.getLen()))
+              listing.write(info.copy(lastProcessed = newLastScanTime,
+                fileSize = reader.fileSizeForLastSequence, lastSequence = 
reader.lastSequence,
+                isComplete = reader.completed))
             }
 
-            if (shouldReloadLog(info, entry)) {
-              if (info.appId.isDefined && fastInProgressParsing) {
+            if (shouldReloadLog(info, reader)) {
+              // ignore fastInProgressParsing when the status of application 
is changed from
+              // in-progress to completed, which is needed for rolling event 
log.
+              if (info.appId.isDefined && (info.isComplete == 
reader.completed) &&
+                fastInProgressParsing) {
 
 Review comment:
   indent more

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to