Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1222#discussion_r18535605
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
    @@ -17,74 +17,43 @@
     
     package org.apache.spark.scheduler
     
    -import java.io.{BufferedInputStream, InputStream}
    +import java.io.{InputStream, IOException}
     
     import scala.io.Source
     
    -import org.apache.hadoop.fs.{Path, FileSystem}
     import org.json4s.jackson.JsonMethods._
     
     import org.apache.spark.Logging
    -import org.apache.spark.io.CompressionCodec
     import org.apache.spark.util.JsonProtocol
     
     /**
    - * A SparkListenerBus that replays logged events from persisted storage.
    - *
    - * This assumes the given paths are valid log files, where each line can 
be deserialized into
    - * exactly one SparkListenerEvent.
    + * A SparkListenerBus that can be used to replay events from serialized 
event data.
      */
    -private[spark] class ReplayListenerBus(
    -    logPaths: Seq[Path],
    -    fileSystem: FileSystem,
    -    compressionCodec: Option[CompressionCodec])
    -  extends SparkListenerBus with Logging {
    -
    -  private var replayed = false
    -
    -  if (logPaths.length == 0) {
    -    logWarning("Log path provided contains no log files.")
    -  }
    +private[spark] class ReplayListenerBus extends SparkListenerBus with 
Logging {
     
       /**
    -   * Replay each event in the order maintained in the given logs.
    -   * This should only be called exactly once.
    +   * Replay each event in the order maintained in the given stream.
    +   * This method can be called multiple times, but the listener behavior 
is undefined after any
    +   * error is thrown by this method.
    +   *
    +   * @param logData Stream containing event log data.
    +   * @param version Spark version that generated the events.
        */
    -  def replay() {
    -    assert(!replayed, "ReplayListenerBus cannot replay events more than 
once")
    -    logPaths.foreach { path =>
    -      // Keep track of input streams at all levels to close them later
    -      // This is necessary because an exception can occur in between 
stream initializations
    -      var fileStream: Option[InputStream] = None
    -      var bufferedStream: Option[InputStream] = None
    -      var compressStream: Option[InputStream] = None
    -      var currentLine = "<not started>"
    -      try {
    -        fileStream = Some(fileSystem.open(path))
    -        bufferedStream = Some(new BufferedInputStream(fileStream.get))
    -        compressStream = Some(wrapForCompression(bufferedStream.get))
    -
    -        // Parse each line as an event and post the event to all attached 
listeners
    -        val lines = Source.fromInputStream(compressStream.get).getLines()
    -        lines.foreach { line =>
    -          currentLine = line
    -          postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
    -        }
    -      } catch {
    -        case e: Exception =>
    -          logError("Exception in parsing Spark event log %s".format(path), 
e)
    -          logError("Malformed line: %s\n".format(currentLine))
    -      } finally {
    -        fileStream.foreach(_.close())
    -        bufferedStream.foreach(_.close())
    -        compressStream.foreach(_.close())
    +  def replay(logData: InputStream, version: String) {
    --- End diff --
    
    Not currently. But I think it's a good idea to have it, because it's 
metadata that is otherwise lost. At some point we may introduce a change that 
makes it required to process new logs in some slightly different way, and the 
version is how you'd make that choice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to