Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/895#discussion_r13420545
  
    --- Diff: core/src/main/scala/org/apache/spark/util/FileAppender.scala ---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util
    +
    +import java.io.{File, FileFilter, FileOutputStream, InputStream}
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.{Executors, ThreadFactory}
    +
    +import org.apache.commons.io.FileUtils
    +import org.apache.spark.{Logging, SparkConf}
    +
    +/**
    + * Continuously appends the data from an input stream into the given file.
    + */
    +private[spark] class FileAppender(inputStream: InputStream, file: File, 
bufferSize: Int = 8192)
    +  extends Logging {
    +  @volatile private var outputStream: FileOutputStream = null
    +  @volatile private var markedForStop = false     // has the appender been 
asked to stopped
    +  @volatile private var stopped = false           // has the appender 
stopped
    +
    +  // Thread that reads the input stream and writes to file
    +  private val writingThread = new Thread("File appending thread for " + 
file) {
    +    setDaemon(true)
    +    override def run() {
    +      Utils.logUncaughtExceptions {
    +        appendStreamToFile()
    +      }
    +    }
    +  }
    +  writingThread.start()
    +
    +  /**
    +   * Wait for the appender to stop appending, either because input stream 
is closed
    +   * or because of any error in appending
    +   */
    +  def awaitTermination() {
    +    synchronized {
    +      if (!stopped) {
    +        wait()
    +      }
    +    }
    +  }
    +
    +  /** Stop the appender */
    +  def stop() {
    +    markedForStop = true
    +  }
    +
    +  /** Continuously read chunks from the input stream and append to the 
file */
    +  protected def appendStreamToFile() {
    +    try {
    +      logDebug("Started appending thread")
    +      openFile()
    +      val buf = new Array[Byte](bufferSize)
    +      var n = 0
    +      while (!markedForStop && n != -1) {
    +        n = inputStream.read(buf)
    +        if (n != -1) {
    +          appendToFile(buf, n)
    +        }
    +      }
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error writing stream to file $file", e)
    +    } finally {
    +      closeFile()
    +      synchronized {
    +        stopped = true
    +        notifyAll()
    +      }
    +    }
    +  }
    +
    +  /** Append bytes to the file output stream */
    +  protected def appendToFile(bytes: Array[Byte], len: Int) {
    +    if (outputStream == null) {
    +      openFile()
    +    }
    +    outputStream.write(bytes, 0, len)
    +  }
    +
    +  /** Open the file output stream */
    +  protected def openFile() {
    +    outputStream = new FileOutputStream(file, true)
    +    logDebug(s"Opened file $file")
    +  }
    +
    +  /** Close the file output stream */
    +  protected def closeFile() {
    +    outputStream.flush()
    +    outputStream.close()
    +    logDebug(s"Closed file $file")
    +  }
    +}
    +
    +/**
    + * Companion object to [[org.apache.log4j.FileAppender]] which has helper
    + * functions to choose the write type of FileAppender based on SparkConf 
configuration.
    + */
    +private[spark] object FileAppender extends Logging {
    +
    +  /** Create the right appender based on Spark configuration */
    +  def apply(inputStream: InputStream, file: File, conf: SparkConf): 
FileAppender = {
    +
    +    import RollingFileAppender._
    +
    +    val rolloverEnabled = conf.getBoolean(ENABLE_PROPERTY, false)
    +    logDebug("Log rollover enabled = " + rolloverEnabled)
    +    if (rolloverEnabled) {
    +
    +      val rolloverSizeOption = conf.getOption(SIZE_PROPERTY)
    +      val rolloverIntervalOption = conf.getOption(INTERVAL_PROPERTY)
    +
    +      (rolloverIntervalOption, rolloverSizeOption) match {
    +
    +        case (Some(rolloverInterval), Some(rolloverSize)) =>              
// if both size and interval have been set
    +          logWarning(s"Rollover interval [$rolloverInterval] and size 
[$rolloverSize] " +
    +            s"both set for executor logs, rolling logs not enabled")
    +          new FileAppender(inputStream, file)
    +
    +        case (Some(rolloverInterval), None) =>  // if interval has been set
    +          rolloverInterval match {
    +            case "daily" =>
    +              logInfo(s"Rolling executor logs enabled for $file with daily 
rolling")
    +              new DailyRollingFileAppender(inputStream, file, conf)
    +            case "hourly" =>
    +              logInfo(s"Rolling executor logs enabled for $file with 
hourly rolling")
    +              new HourlyRollingFileAppender(inputStream, file, conf)
    +            case "minutely" =>
    +              logInfo(s"Rolling executor logs enabled for $file with 
rolling every minute")
    +              new MinutelyRollingFileAppender(inputStream, file, conf)
    +            case IntParam(millis) =>
    +              logInfo(s"Rolling executor logs enabled for $file with 
rolling every minute")
    +              new RollingFileAppender(inputStream, file, new 
TimeBasedRollingPolicy(millis),
    +                s"--YYYY-MM-dd--HH-mm-ss-SSSS", conf)
    +            case _ =>
    +              logWarning(
    +                s"Illegal interval for rolling executor logs 
[$rolloverInterval], " +
    +                  s"rolling logs not enabled")
    +              new FileAppender(inputStream, file)
    +          }
    +
    +        case (None, Some(rolloverSize)) =>    // if size has been set
    +          rolloverSize match {
    +            case IntParam(bytes) =>
    +              logInfo(s"Rolling executor logs enabled for $file with 
rolling every $bytes bytes")
    +              new RollingFileAppender(inputStream, file, new 
SizeBasedRollingPolicy(bytes),
    +                s"--YYYY-MM-dd--HH-mm-ss-SSSS", conf)
    +            case _ =>
    +              logWarning(
    +                s"Illegal size for rolling executor logs [$rolloverSize], 
" +
    +                  s"rolling logs not enabled")
    +              new FileAppender(inputStream, file)
    +          }
    +
    +        case (None, None) =>                // if neither size nor 
interval has been set
    +          logWarning(s"Interval and size for rolling executor logs not 
set, " +
    +            s"rolling logs enabled with daily rolling.")
    +          new DailyRollingFileAppender(inputStream, file, conf)
    +      }
    +    } else {
    +      new FileAppender(inputStream, file)
    +    }
    +  }
    +}
    +
    +/**
    + * Defines the policy based on which 
[[org.apache.log4j.RollingFileAppender]] will
    + * generate rolling files.
    + */
    +private[spark] trait RollingPolicy {
    +
    +  /** Whether rollover should be initiated at this moment */
    +  def shouldRollover(bytesToBeWritten: Long): Boolean
    +
    +  /** Notify that rollover has occurred */
    +  def rolledOver()
    +
    +  /** Notify that bytes have been written */
    +  def bytesWritten(bytes: Long)
    +}
    +
    +/**
    + * Defines a [[org.apache.spark.util.RollingPolicy]] by which files will 
be rolled
    + * over at a fixed interval.
    + */
    +private[spark] class TimeBasedRollingPolicy(val rolloverIntervalMillis: 
Long)
    +  extends RollingPolicy with Logging {
    +
    +  require(rolloverIntervalMillis >= 100)
    +
    +  @volatile private var nextRolloverTime = calculateNextRolloverTime()
    +
    +  /** Should rollover if current time has exceeded next rollover time */
    +  def shouldRollover(bytesToBeWritten: Long): Boolean = {
    +    System.currentTimeMillis > nextRolloverTime
    +  }
    +
    +  /** Rollover has occurred, so find the next time to rollover */
    +  def rolledOver() {
    +    nextRolloverTime = calculateNextRolloverTime()
    +    logDebug(s"Current time: ${System.currentTimeMillis}, next rollover 
time: " + nextRolloverTime)
    +  }
    +
    +  def bytesWritten(bytes: Long) { }  // nothing to do
    +
    +  private def calculateNextRolloverTime(): Long = {
    +    val now = System.currentTimeMillis()
    +    val targetTime = math.ceil(now.toDouble / rolloverIntervalMillis) * 
rolloverIntervalMillis
    --- End diff --
    
    @tdas Ah okay I actually found one potential issue. When rolling on days... 
I guess this will just always use the UTC day? Might be good to document that.
    
    ```
    scala> val now = System.currentTimeMillis()
    now: Long = 1401931771561
    
    scala> val rolloverIntervalMillis = 24 * 3600 * 1000
    rolloverIntervalMillis: Int = 86400000
    
    scala> val targetTime = math.ceil(now.toDouble / rolloverIntervalMillis) * 
rolloverIntervalMillis
    targetTime: Double = 1.4020128E12
    
    scala> new Date(targetTime.toLong)
    res2: java.util.Date = Thu Jun 05 17:00:00 PDT 2014
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to