Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/895#discussion_r13420545
--- Diff: core/src/main/scala/org/apache/spark/util/FileAppender.scala ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{File, FileFilter, FileOutputStream, InputStream}
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.{Logging, SparkConf}
+
+/**
+ * Continuously appends the data from an input stream into the given file.
+ */
+private[spark] class FileAppender(inputStream: InputStream, file: File,
bufferSize: Int = 8192)
+ extends Logging {
+ @volatile private var outputStream: FileOutputStream = null
+ @volatile private var markedForStop = false // has the appender been
asked to stopped
+ @volatile private var stopped = false // has the appender
stopped
+
+ // Thread that reads the input stream and writes to file
+ private val writingThread = new Thread("File appending thread for " +
file) {
+ setDaemon(true)
+ override def run() {
+ Utils.logUncaughtExceptions {
+ appendStreamToFile()
+ }
+ }
+ }
+ writingThread.start()
+
+ /**
+ * Wait for the appender to stop appending, either because input stream
is closed
+ * or because of any error in appending
+ */
+ def awaitTermination() {
+ synchronized {
+ if (!stopped) {
+ wait()
+ }
+ }
+ }
+
+ /** Stop the appender */
+ def stop() {
+ markedForStop = true
+ }
+
+ /** Continuously read chunks from the input stream and append to the
file */
+ protected def appendStreamToFile() {
+ try {
+ logDebug("Started appending thread")
+ openFile()
+ val buf = new Array[Byte](bufferSize)
+ var n = 0
+ while (!markedForStop && n != -1) {
+ n = inputStream.read(buf)
+ if (n != -1) {
+ appendToFile(buf, n)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error writing stream to file $file", e)
+ } finally {
+ closeFile()
+ synchronized {
+ stopped = true
+ notifyAll()
+ }
+ }
+ }
+
+ /** Append bytes to the file output stream */
+ protected def appendToFile(bytes: Array[Byte], len: Int) {
+ if (outputStream == null) {
+ openFile()
+ }
+ outputStream.write(bytes, 0, len)
+ }
+
+ /** Open the file output stream */
+ protected def openFile() {
+ outputStream = new FileOutputStream(file, true)
+ logDebug(s"Opened file $file")
+ }
+
+ /** Close the file output stream */
+ protected def closeFile() {
+ outputStream.flush()
+ outputStream.close()
+ logDebug(s"Closed file $file")
+ }
+}
+
+/**
+ * Companion object to [[org.apache.log4j.FileAppender]] which has helper
+ * functions to choose the write type of FileAppender based on SparkConf
configuration.
+ */
+private[spark] object FileAppender extends Logging {
+
+ /** Create the right appender based on Spark configuration */
+ def apply(inputStream: InputStream, file: File, conf: SparkConf):
FileAppender = {
+
+ import RollingFileAppender._
+
+ val rolloverEnabled = conf.getBoolean(ENABLE_PROPERTY, false)
+ logDebug("Log rollover enabled = " + rolloverEnabled)
+ if (rolloverEnabled) {
+
+ val rolloverSizeOption = conf.getOption(SIZE_PROPERTY)
+ val rolloverIntervalOption = conf.getOption(INTERVAL_PROPERTY)
+
+ (rolloverIntervalOption, rolloverSizeOption) match {
+
+ case (Some(rolloverInterval), Some(rolloverSize)) =>
// if both size and interval have been set
+ logWarning(s"Rollover interval [$rolloverInterval] and size
[$rolloverSize] " +
+ s"both set for executor logs, rolling logs not enabled")
+ new FileAppender(inputStream, file)
+
+ case (Some(rolloverInterval), None) => // if interval has been set
+ rolloverInterval match {
+ case "daily" =>
+ logInfo(s"Rolling executor logs enabled for $file with daily
rolling")
+ new DailyRollingFileAppender(inputStream, file, conf)
+ case "hourly" =>
+ logInfo(s"Rolling executor logs enabled for $file with
hourly rolling")
+ new HourlyRollingFileAppender(inputStream, file, conf)
+ case "minutely" =>
+ logInfo(s"Rolling executor logs enabled for $file with
rolling every minute")
+ new MinutelyRollingFileAppender(inputStream, file, conf)
+ case IntParam(millis) =>
+ logInfo(s"Rolling executor logs enabled for $file with
rolling every minute")
+ new RollingFileAppender(inputStream, file, new
TimeBasedRollingPolicy(millis),
+ s"--YYYY-MM-dd--HH-mm-ss-SSSS", conf)
+ case _ =>
+ logWarning(
+ s"Illegal interval for rolling executor logs
[$rolloverInterval], " +
+ s"rolling logs not enabled")
+ new FileAppender(inputStream, file)
+ }
+
+ case (None, Some(rolloverSize)) => // if size has been set
+ rolloverSize match {
+ case IntParam(bytes) =>
+ logInfo(s"Rolling executor logs enabled for $file with
rolling every $bytes bytes")
+ new RollingFileAppender(inputStream, file, new
SizeBasedRollingPolicy(bytes),
+ s"--YYYY-MM-dd--HH-mm-ss-SSSS", conf)
+ case _ =>
+ logWarning(
+ s"Illegal size for rolling executor logs [$rolloverSize],
" +
+ s"rolling logs not enabled")
+ new FileAppender(inputStream, file)
+ }
+
+ case (None, None) => // if neither size nor
interval has been set
+ logWarning(s"Interval and size for rolling executor logs not
set, " +
+ s"rolling logs enabled with daily rolling.")
+ new DailyRollingFileAppender(inputStream, file, conf)
+ }
+ } else {
+ new FileAppender(inputStream, file)
+ }
+ }
+}
+
+/**
+ * Defines the policy based on which
[[org.apache.log4j.RollingFileAppender]] will
+ * generate rolling files.
+ */
+private[spark] trait RollingPolicy {
+
+ /** Whether rollover should be initiated at this moment */
+ def shouldRollover(bytesToBeWritten: Long): Boolean
+
+ /** Notify that rollover has occurred */
+ def rolledOver()
+
+ /** Notify that bytes have been written */
+ def bytesWritten(bytes: Long)
+}
+
+/**
+ * Defines a [[org.apache.spark.util.RollingPolicy]] by which files will
be rolled
+ * over at a fixed interval.
+ */
+private[spark] class TimeBasedRollingPolicy(val rolloverIntervalMillis:
Long)
+ extends RollingPolicy with Logging {
+
+ require(rolloverIntervalMillis >= 100)
+
+ @volatile private var nextRolloverTime = calculateNextRolloverTime()
+
+ /** Should rollover if current time has exceeded next rollover time */
+ def shouldRollover(bytesToBeWritten: Long): Boolean = {
+ System.currentTimeMillis > nextRolloverTime
+ }
+
+ /** Rollover has occurred, so find the next time to rollover */
+ def rolledOver() {
+ nextRolloverTime = calculateNextRolloverTime()
+ logDebug(s"Current time: ${System.currentTimeMillis}, next rollover
time: " + nextRolloverTime)
+ }
+
+ def bytesWritten(bytes: Long) { } // nothing to do
+
+ private def calculateNextRolloverTime(): Long = {
+ val now = System.currentTimeMillis()
+ val targetTime = math.ceil(now.toDouble / rolloverIntervalMillis) *
rolloverIntervalMillis
--- End diff --
@tdas Ah okay I actually found one potential issue. When rolling on days...
I guess this will just always use the UTC day? Might be good to document that.
```
scala> val now = System.currentTimeMillis()
now: Long = 1401931771561
scala> val rolloverIntervalMillis = 24 * 3600 * 1000
rolloverIntervalMillis: Int = 86400000
scala> val targetTime = math.ceil(now.toDouble / rolloverIntervalMillis) *
rolloverIntervalMillis
targetTime: Double = 1.4020128E12
scala> new Date(targetTime.toLong)
res2: java.util.Date = Thu Jun 05 17:00:00 PDT 2014
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---