Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1222#discussion_r22138965
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
    @@ -123,130 +168,161 @@ private[spark] class EventLoggingListener(
         logEvent(event, flushLogger = true)
       override def onApplicationEnd(event: SparkListenerApplicationEnd) =
         logEvent(event, flushLogger = true)
    +
       // No-op because logging every update would be overkill
       override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate) { }
     
       /**
    -   * Stop logging events.
    -   * In addition, create an empty special file to indicate application 
completion.
    +   * Stop logging events. The event log file will be renamed so that it 
loses the
    +   * ".inprogress" suffix.
        */
       def stop() = {
    -    logger.newFile(APPLICATION_COMPLETE)
    -    logger.stop()
    +    writer.foreach(_.close())
    +
    +    val target = new Path(logPath)
    +    if (fileSystem.exists(target)) {
    +      if (shouldOverwrite) {
    +        logWarning(s"Event log $target already exists. Overwriting...")
    +        fileSystem.delete(target, true)
    +      } else {
    +        throw new IOException("Target log file already exists 
(%s)".format(logPath))
    +      }
    +    }
    +    fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
       }
    +
     }
     
     private[spark] object EventLoggingListener extends Logging {
    +  // Suffix applied to the names of files still being written by 
applications.
    +  val IN_PROGRESS = ".inprogress"
       val DEFAULT_LOG_DIR = "/tmp/spark-events"
    -  val LOG_PREFIX = "EVENT_LOG_"
    -  val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
    -  val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
    -  val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
    -  val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 
8).toShort)
     
    -  // A cache for compression codecs to avoid creating the same codec many 
times
    -  private val codecMap = new mutable.HashMap[String, CompressionCodec]
    +  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
     
    -  def isEventLogFile(fileName: String): Boolean = {
    -    fileName.startsWith(LOG_PREFIX)
    -  }
    +  // Marker for the end of header data in a log file. After this marker, 
log data, potentially
    +  // compressed, will be found.
    +  private val HEADER_END_MARKER = "=== LOG_HEADER_END ==="
     
    -  def isSparkVersionFile(fileName: String): Boolean = {
    -    fileName.startsWith(SPARK_VERSION_PREFIX)
    -  }
    +  // To avoid corrupted files causing the heap to fill up. Value is 
arbitrary.
    +  private val MAX_HEADER_LINE_LENGTH = 4096
     
    -  def isCompressionCodecFile(fileName: String): Boolean = {
    -    fileName.startsWith(COMPRESSION_CODEC_PREFIX)
    -  }
    +  // A cache for compression codecs to avoid creating the same codec many 
times
    +  private val codecMap = new mutable.HashMap[String, CompressionCodec]
     
    -  def isApplicationCompleteFile(fileName: String): Boolean = {
    -    fileName == APPLICATION_COMPLETE
    -  }
    +  /**
    +   * Write metadata about the event log to the given stream.
    +   *
    +   * The header is a serialized version of a map, except it does not use 
Java serialization to
    +   * avoid incompatibilities between different JDKs. It writes one map 
entry per line, in
    +   * "key=value" format.
    +   *
    +   * The very last entry in the header is the `HEADER_END_MARKER` marker, 
so that the parsing code
    +   * can know when to stop.
    +   *
    +   * The format needs to be kept in sync with the openEventLog() method 
below. Also, it cannot
    +   * change in new Spark versions without some other way of detecting the 
change (like some
    +   * metadata encoded in the file name).
    +   *
    +   * @param logStream Raw output stream to the even log file.
    +   * @param compressionCodec Optional compression codec to use.
    +   * @return A stream where to write event log data. This may be a wrapper 
around the original
    +   *         stream (for example, when compression is enabled).
    +   */
    +  def initEventLog(
    +      logStream: OutputStream,
    +      compressionCodec: Option[CompressionCodec]): OutputStream = {
    +    val meta = mutable.HashMap(("version" -> SPARK_VERSION))
    +    compressionCodec.foreach { codec =>
    +      meta += ("compressionCodec" -> codec.getClass().getName())
    +    }
     
    -  def parseSparkVersion(fileName: String): String = {
    -    if (isSparkVersionFile(fileName)) {
    -      fileName.replaceAll(SPARK_VERSION_PREFIX, "")
    -    } else ""
    -  }
    +    def write(entry: String) = {
    +      val bytes = entry.getBytes(Charsets.UTF_8)
    +      if (bytes.length > MAX_HEADER_LINE_LENGTH) {
    +        throw new IOException(s"Header entry too long: ${entry}")
    +      }
    +      logStream.write(bytes, 0, bytes.length)
    +    }
     
    -  def parseCompressionCodec(fileName: String): String = {
    -    if (isCompressionCodecFile(fileName)) {
    -      fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
    -    } else ""
    +    meta.foreach { case (k, v) => write(s"$k=$v\n") }
    +    write(s"$HEADER_END_MARKER\n")
    +    
compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
       }
     
       /**
    -   * Return a file-system-safe path to the log directory for the given 
application.
    +   * Return a file-system-safe path to the log file for the given 
application.
        *
    -   * @param logBaseDir A base directory for the path to the log directory 
for given application.
    +   * @param logBaseDir Directory where the log file will be written.
        * @param appId A unique app ID.
        * @return A path which consists of file-system-safe characters.
        */
    -  def getLogDirPath(logBaseDir: String, appId: String): String = {
    +  def getLogPath(logBaseDir: String, appId: String): String = {
         val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", 
"_").toLowerCase
         Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
       }
     
       /**
    -   * Parse the event logging information associated with the logs in the 
given directory.
    +   * Opens an event log file and returns an input stream to the event data.
        *
    -   * Specifically, this looks for event log files, the Spark version file, 
the compression
    -   * codec file (if event logs are compressed), and the application 
completion file (if the
    -   * application has run to completion).
    +   * @return 2-tuple (event input stream, Spark version of event data)
        */
    -  def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): 
EventLoggingInfo = {
    +  def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
    +    // It's not clear whether FileSystem.open() throws 
FileNotFoundException or just plain
    +    // IOException when a file does not exist, so try our best to throw a 
proper exception.
    +    if (!fs.exists(log)) {
    +      throw new FileNotFoundException(s"File $log does not exist.")
    +    }
    +
    +    val in = new BufferedInputStream(fs.open(log))
    +    def readLine() = {
    +      val bytes = new ByteArrayOutputStream()
    +      var next = in.read()
    +      var count = 0
    +      while (next != '\n') {
    +        if (next == -1) {
    +          throw new IOException("Unexpected end of file.")
    +        }
    +        bytes.write(next)
    +        count = count + 1
    +        if (count > MAX_HEADER_LINE_LENGTH) {
    +          throw new IOException("Maximum header line length exceeeded.")
    --- End diff --
    
    exceeded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to