HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325443715
########## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{BufferedInputStream, InputStream} +import java.util.zip.{ZipEntry, ZipOutputStream} + +import scala.collection.mutable.Map + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.io.CompressionCodec +import org.apache.spark.scheduler.EventLogFileWriter.codecName +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( + protected val fileSystem: FileSystem, + val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { + Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { + case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) + case _ => None + } + } + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { + Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() + } + } + + // ================ methods to be override ================ + + /** Returns the last sequence of event log files. None for single event log file. */ + def lastSequence: Option[Long] + + /** + * Returns the size of file for the last sequence of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastSequence: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last sequence of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastSequenceForDFS: Option[Long] + + /** Returns the modification time for the last sequence of event log files. */ + def modificationTime: Long + + /** + * This method compresses the files passed in, and writes the compressed data out into the + * ZipOutputStream passed in. Each file is written as a new ZipEntry with its name being + * the name of the file being compressed. + */ + def zipEventLogFiles(zipStream: ZipOutputStream): Unit + + /** Returns all available event log files. */ + def listEventLogFiles: Seq[FileStatus] + + /** Returns the short compression name if being used. None if it's uncompressed. */ + def compression: Option[String] + + /** Returns the size of all event log files. */ + def allSize: Long +} + +object EventLogFileReader { + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = Map.empty[String, CompressionCodec] + + def getEventLogReader( + fs: FileSystem, + path: Path, + lastSequence: Option[Long]): EventLogFileReader = { + lastSequence match { + case Some(_) => new RollingEventLogFilesFileReader(fs, path) + case None => new SingleFileEventLogFileReader(fs, path) + } + } + + def getEventLogReader(fs: FileSystem, path: Path): Option[EventLogFileReader] = { + val status = fs.getFileStatus(path) + if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, path)) + } else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, path)) + } else { + None + } + } + + def getEventLogReader(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = { + if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, status.getPath)) + } else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, status.getPath)) + } else { + None + } + } + + /** + * Opens an event log file and returns an input stream that contains the event data. + * + * @return input stream that holds one JSON record per line. + */ + def openEventLog(log: Path, fs: FileSystem): InputStream = { + val in = new BufferedInputStream(fs.open(log)) + try { + val codec = codecName(log).map { c => + codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } + codec.map(_.compressedContinuousInputStream(in)).getOrElse(in) + } catch { + case e: Throwable => + in.close() + throw e + } + } + + private def isSingleEventLog(status: FileStatus): Boolean = { + !status.isDirectory && + // FsHistoryProvider used to generate a hidden file which can't be read. Accidentally + // reading a garbage file is safe, but we would log an error which can be scary to + // the end-user. + !status.getPath.getName.startsWith(".") + } + + private def isRollingEventLogs(status: FileStatus): Boolean = { + status.isDirectory && RollingEventLogFilesWriter.isEventLogDir(status) + } +} + +/** The reader which will read the information of single event log file. */ +class SingleFileEventLogFileReader( + fs: FileSystem, + path: Path) extends EventLogFileReader(fs, path) { + // TODO: get stats with constructor and only call if it's needed? + private lazy val stats = fileSystem.getFileStatus(rootPath) Review comment: Hmm... I guess the term is tend to be used for both cases but I agree it could bring confusion. I’ll rename to status. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
