Github user bOOm-X commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18253#discussion_r130359427
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
    @@ -131,94 +132,35 @@ private[spark] class EventLoggingListener(
       }
     
       /** Log the event as JSON. */
    -  private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = 
false) {
    +  private def logEvent(event: SparkListenerEvent) {
         val eventJson = JsonProtocol.sparkEventToJson(event)
         // scalastyle:off println
         writer.foreach(_.println(compact(render(eventJson))))
         // scalastyle:on println
    -    if (flushLogger) {
    -      writer.foreach(_.flush())
    -      hadoopDataStream.foreach(_.hflush())
    -    }
         if (testing) {
           loggedEvents += eventJson
    +      flush()
         }
       }
     
    -  // Events that do not trigger a flush
    -  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= logEvent(event)
    -
    -  override def onTaskStart(event: SparkListenerTaskStart): Unit = 
logEvent(event)
    -
    -  override def onTaskGettingResult(event: SparkListenerTaskGettingResult): 
Unit = logEvent(event)
    -
    -  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = 
logEvent(event)
    -
    -  override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): 
Unit = {
    -    logEvent(redactEvent(event))
    -  }
    -
    -  // Events that trigger a flush
    -  override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
    -    logEvent(event, flushLogger = true)
    +  private def flush(): Unit = {
    +    writer.foreach(_.flush())
    +    hadoopDataStream.foreach(_.hflush())
       }
     
    -  override def onJobStart(event: SparkListenerJobStart): Unit = 
logEvent(event, flushLogger = true)
    -
    -  override def onJobEnd(event: SparkListenerJobEnd): Unit = 
logEvent(event, flushLogger = true)
    -
    -  override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): 
Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onBlockManagerRemoved(event: 
SparkListenerBlockManagerRemoved): Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onApplicationStart(event: SparkListenerApplicationStart): 
Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit 
= {
    -    logEvent(event, flushLogger = true)
    -  }
    -  override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onExecutorRemoved(event: SparkListenerExecutorRemoved): 
Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onExecutorBlacklisted(event: 
SparkListenerExecutorBlacklisted): Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onExecutorUnblacklisted(event: 
SparkListenerExecutorUnblacklisted): Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): 
Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): 
Unit = {
    -    logEvent(event, flushLogger = true)
    -  }
    -
    -  // No-op because logging every update would be overkill
    -  override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
    -
    -  // No-op because logging every update would be overkill
    -  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
    -
    -  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    +  def log(event: SparkListenerEvent): Unit = {
         if (event.logEvent) {
    -      logEvent(event, flushLogger = true)
    +      val toLog = event match {
    +        case update: SparkListenerEnvironmentUpdate =>
    +          redactEvent(update)
    +        case _ => event
    +      }
    +      logEvent(toLog)
    +      nbMessageProcessed = nbMessageProcessed + 1
    +      if (nbMessageProcessed == FLUSH_FREQUENCY) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to