vanzin commented on a change in pull request #25670: [SPARK-28869][CORE] Roll 
over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325365024
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala
 ##########
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.mutable.Map
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.scheduler.EventLogFileWriter.codecName
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+    protected val fileSystem: FileSystem,
+    val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+    Utils.tryWithResource(fileSystem.open(path)) { in =>
+      in.getWrappedStream match {
+        case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+        case _ => None
+      }
+    }
+  }
+
+  protected def addFileAsZipEntry(
+      zipStream: ZipOutputStream,
+      path: Path,
+      entryName: String): Unit = {
+    Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+      zipStream.putNextEntry(new ZipEntry(entryName))
+      ByteStreams.copy(inputStream, zipStream)
+      zipStream.closeEntry()
+    }
+  }
+
+  // ================ methods to be override ================
+
+  /** Returns the last sequence of event log files. None for single event log 
file. */
+  def lastSequence: Option[Long]
+
+  /**
+   * Returns the size of file for the last sequence of event log files. 
Returns its size for
+   * single event log file.
+   */
+  def fileSizeForLastSequence: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last sequence of event log files, only 
when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastSequenceForDFS: Option[Long]
+
+  /** Returns the modification time for the last sequence of event log files. 
*/
+  def modificationTime: Long
+
+  /**
+   * This method compresses the files passed in, and writes the compressed 
data out into the
+   * ZipOutputStream passed in. Each file is written as a new ZipEntry with 
its name being
+   * the name of the file being compressed.
+   */
+  def zipEventLogFiles(zipStream: ZipOutputStream): Unit
+
+  /** Returns all available event log files. */
+  def listEventLogFiles: Seq[FileStatus]
+
+  /** Returns the short compression name if being used. None if it's 
uncompressed. */
+  def compression: Option[String]
+
+  /** Returns the size of all event log files. */
+  def allSize: Long
+}
+
+object EventLogFileReader {
+  // A cache for compression codecs to avoid creating the same codec many times
+  private val codecMap = Map.empty[String, CompressionCodec]
+
+  def getEventLogReader(
+      fs: FileSystem,
+      path: Path,
+      lastSequence: Option[Long]): EventLogFileReader = {
+    lastSequence match {
+      case Some(_) => new RollingEventLogFilesFileReader(fs, path)
+      case None => new SingleFileEventLogFileReader(fs, path)
+    }
+  }
+
+  def getEventLogReader(fs: FileSystem, path: Path): 
Option[EventLogFileReader] = {
 
 Review comment:
   This should call the overridden method or vice-versa, since they're 
basically doing the same thing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to