Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/895#discussion_r13468415
  
    --- Diff: core/src/main/scala/org/apache/spark/util/FileAppender.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util
    +
    +import java.io.{File, FileFilter, FileOutputStream, InputStream}
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.{Executors, ThreadFactory}
    +
    +import org.apache.commons.io.FileUtils
    +import org.apache.spark.{Logging, SparkConf}
    +import RollingFileAppender._
    +
    +/**
    + * Continuously appends the data from an input stream into the given file.
    + */
    +private[spark] class FileAppender(inputStream: InputStream, file: File, 
bufferSize: Int = 8192)
    +  extends Logging {
    +  @volatile private var outputStream: FileOutputStream = null
    +  @volatile private var markedForStop = false     // has the appender been 
asked to stopped
    +  @volatile private var stopped = false           // has the appender 
stopped
    +
    +  // Thread that reads the input stream and writes to file
    +  private val writingThread = new Thread("File appending thread for " + 
file) {
    +    setDaemon(true)
    +    override def run() {
    +      Utils.logUncaughtExceptions {
    +        appendStreamToFile()
    +      }
    +    }
    +  }
    +  writingThread.start()
    +
    +  /**
    +   * Wait for the appender to stop appending, either because input stream 
is closed
    +   * or because of any error in appending
    +   */
    +  def awaitTermination() {
    +    synchronized {
    +      if (!stopped) {
    +        wait()
    +      }
    +    }
    +  }
    +
    +  /** Stop the appender */
    +  def stop() {
    +    markedForStop = true
    +  }
    +
    +  /** Continuously read chunks from the input stream and append to the 
file */
    +  protected def appendStreamToFile() {
    +    try {
    +      logDebug("Started appending thread")
    +      openFile()
    +      val buf = new Array[Byte](bufferSize)
    +      var n = 0
    +      while (!markedForStop && n != -1) {
    +        n = inputStream.read(buf)
    +        if (n != -1) {
    +          appendToFile(buf, n)
    +        }
    +      }
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error writing stream to file $file", e)
    +    } finally {
    +      closeFile()
    +      synchronized {
    +        stopped = true
    +        notifyAll()
    +      }
    +    }
    +  }
    +
    +  /** Append bytes to the file output stream */
    +  protected def appendToFile(bytes: Array[Byte], len: Int) {
    +    if (outputStream == null) {
    +      openFile()
    +    }
    +    outputStream.write(bytes, 0, len)
    +  }
    +
    +  /** Open the file output stream */
    +  protected def openFile() {
    +    outputStream = new FileOutputStream(file, true)
    +    logDebug(s"Opened file $file")
    +  }
    +
    +  /** Close the file output stream */
    +  protected def closeFile() {
    +    outputStream.flush()
    +    outputStream.close()
    +    logDebug(s"Closed file $file")
    +  }
    +}
    +
    +/**
    + * Companion object to [[org.apache.log4j.FileAppender]] which has helper
    + * functions to choose the write type of FileAppender based on SparkConf 
configuration.
    + */
    +private[spark] object FileAppender extends Logging {
    +
    +  /** Create the right appender based on Spark configuration */
    +  def apply(inputStream: InputStream, file: File, conf: SparkConf): 
FileAppender = {
    +
    +    import RollingFileAppender._
    +
    +    val rolloverSizeOption = conf.getOption(SIZE_PROPERTY)
    +    val rolloverIntervalOption = conf.getOption(INTERVAL_PROPERTY)
    +
    +    (rolloverIntervalOption, rolloverSizeOption) match {
    +
    +      case (Some(rolloverInterval), Some(rolloverSize)) =>
    +        // if both size and interval have been set
    +        logWarning(s"Rollover interval [$rolloverInterval] and size 
[$rolloverSize] " +
    +          s"both set for executor logs, rolling logs not enabled")
    +        new FileAppender(inputStream, file)
    +
    +      case (Some(rolloverInterval), None) =>  // if interval has been set
    +        val validatedParams: Option[(Long, String)] = rolloverInterval 
match {
    +          case "daily" =>
    +            logInfo(s"Rolling executor logs enabled for $file with daily 
rolling")
    +            Some(24 * 60 * 60 * 1000L, s"--YYYY-MM-dd")
    +          case "hourly" =>
    +            logInfo(s"Rolling executor logs enabled for $file with hourly 
rolling")
    +            Some(60 * 60 * 1000L, s"--YYYY-MM-dd--HH")
    +          case "minutely" =>
    +            logInfo(s"Rolling executor logs enabled for $file with rolling 
every minute")
    +            Some(60 * 1000L, s"--YYYY-MM-dd--HH-mm")
    +          case IntParam(seconds) =>
    +            logInfo(s"Rolling executor logs enabled for $file with rolling 
$seconds seconds")
    +            Some(seconds * 1000L, s"--YYYY-MM-dd--HH-mm-ss")
    +          case _ =>
    +            logWarning(s"Illegal interval for rolling executor logs 
[$rolloverInterval], " +
    +                s"rolling logs not enabled")
    +            None
    +        }
    +        validatedParams.map {
    +          case (interval, pattern) =>
    +            new RollingFileAppender(
    +              inputStream, file, new 
TimeBasedRollingPolicy(interval),pattern, conf)
    +        }.getOrElse {
    +          new FileAppender(inputStream, file)
    +        }
    +
    +      case (None, Some(rolloverSize)) =>    // if size has been set
    +        rolloverSize match {
    +          case IntParam(bytes) =>
    +            logInfo(s"Rolling executor logs enabled for $file with rolling 
every $bytes bytes")
    +            new RollingFileAppender(inputStream, file, new 
SizeBasedRollingPolicy(bytes),
    +              s"--YYYY-MM-dd--HH-mm-ss-SSSS", conf)
    +          case _ =>
    +            logWarning(
    +              s"Illegal size for rolling executor logs [$rolloverSize], " +
    +                s"rolling logs not enabled")
    +            new FileAppender(inputStream, file)
    +        }
    +
    +      case (None, None) =>                // if neither size nor interval 
has been set
    +        new FileAppender(inputStream, file)
    +    }
    +  }
    +}
    +
    +/**
    + * Defines the policy based on which 
[[org.apache.spark.util.RollingFileAppender]] will
    + * generate rolling files.
    + */
    +private[spark] trait RollingPolicy {
    +
    +  /** Whether rollover should be initiated at this moment */
    +  def shouldRollover(bytesToBeWritten: Long): Boolean
    +
    +  /** Notify that rollover has occurred */
    +  def rolledOver()
    +
    +  /** Notify that bytes have been written */
    +  def bytesWritten(bytes: Long)
    +}
    +
    +/**
    + * Defines a [[org.apache.spark.util.RollingPolicy]] by which files will 
be rolled
    + * over at a fixed interval.
    + */
    +private[spark] class TimeBasedRollingPolicy(
    +    var rolloverIntervalMillis: Long,
    +    checkIntervalConstraint: Boolean = true   // set to false while testing
    +  ) extends RollingPolicy with Logging {
    +
    +  import TimeBasedRollingPolicy._
    +  if (checkIntervalConstraint && rolloverIntervalMillis < 
MINIMUM_INTERVAL_SECONDS * 1000L) {
    +    logWarning(s"Rolling interval [${rolloverIntervalMillis/1000L} 
seconds] is too small. " +
    +      s"Setting the interval to the acceptable minimum of 
$MINIMUM_INTERVAL_SECONDS seconds.")
    +    rolloverIntervalMillis = MINIMUM_INTERVAL_SECONDS * 1000L
    +  }
    +
    +  @volatile private var nextRolloverTime = calculateNextRolloverTime()
    +
    +  /** Should rollover if current time has exceeded next rollover time */
    +  def shouldRollover(bytesToBeWritten: Long): Boolean = {
    +    System.currentTimeMillis > nextRolloverTime
    +  }
    +
    +  /** Rollover has occurred, so find the next time to rollover */
    +  def rolledOver() {
    +    nextRolloverTime = calculateNextRolloverTime()
    +    logDebug(s"Current time: ${System.currentTimeMillis}, next rollover 
time: " + nextRolloverTime)
    +  }
    +
    +  def bytesWritten(bytes: Long) { }  // nothing to do
    +
    +  private def calculateNextRolloverTime(): Long = {
    +    val now = System.currentTimeMillis()
    +    val targetTime = (
    +      math.ceil(now.toDouble / rolloverIntervalMillis) * 
rolloverIntervalMillis
    +    ).toLong
    +    logDebug(s"Next rollover time is $targetTime")
    +    targetTime
    +  }
    +}
    +
    +private[spark] object TimeBasedRollingPolicy {
    +  val MINIMUM_INTERVAL_SECONDS = 60L  // 1 minute
    +}
    +
    +/**
    + * Defines a [[org.apache.spark.util.RollingPolicy]] by which files will 
be rolled
    + * over after reaching a particular size.
    + */
    +private[spark] class SizeBasedRollingPolicy(
    +    var rolloverSizeBytes: Long,
    +    checkSizeConstraint: Boolean = true     // set to false while testing
    +  ) extends RollingPolicy with Logging {
    +
    +  import SizeBasedRollingPolicy._
    +  if (checkSizeConstraint && rolloverSizeBytes < MINIMUM_SIZE_BYTES) {
    +    logWarning(s"Rolling size [$rolloverSizeBytes bytes] is too small. " +
    +      s"Setting the size to the acceptable minimum of $MINIMUM_SIZE_BYTES 
bytes.")
    +    rolloverSizeBytes = MINIMUM_SIZE_BYTES * 1000L
    --- End diff --
    
    Why multiply by 1000? This seems inconsistent with the error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to