Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/895#discussion_r13530223
--- Diff: core/src/main/scala/org/apache/spark/util/FileAppender.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{File, FileFilter, FileOutputStream, InputStream}
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.{Logging, SparkConf}
+import RollingFileAppender._
+
+/**
+ * Continuously appends the data from an input stream into the given file.
+ */
+private[spark] class FileAppender(inputStream: InputStream, file: File,
bufferSize: Int = 8192)
+ extends Logging {
+ @volatile private var outputStream: FileOutputStream = null
+ @volatile private var markedForStop = false // has the appender been
asked to stopped
+ @volatile private var stopped = false // has the appender
stopped
+
+ // Thread that reads the input stream and writes to file
+ private val writingThread = new Thread("File appending thread for " +
file) {
+ setDaemon(true)
+ override def run() {
+ Utils.logUncaughtExceptions {
+ appendStreamToFile()
+ }
+ }
+ }
+ writingThread.start()
+
+ /**
+ * Wait for the appender to stop appending, either because input stream
is closed
+ * or because of any error in appending
+ */
+ def awaitTermination() {
+ synchronized {
+ if (!stopped) {
+ wait()
+ }
+ }
+ }
+
+ /** Stop the appender */
+ def stop() {
+ markedForStop = true
+ }
+
+ /** Continuously read chunks from the input stream and append to the
file */
+ protected def appendStreamToFile() {
+ try {
+ logDebug("Started appending thread")
+ openFile()
+ val buf = new Array[Byte](bufferSize)
+ var n = 0
+ while (!markedForStop && n != -1) {
+ n = inputStream.read(buf)
+ if (n != -1) {
+ appendToFile(buf, n)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error writing stream to file $file", e)
+ } finally {
+ closeFile()
+ synchronized {
+ stopped = true
+ notifyAll()
+ }
+ }
+ }
+
+ /** Append bytes to the file output stream */
+ protected def appendToFile(bytes: Array[Byte], len: Int) {
+ if (outputStream == null) {
+ openFile()
+ }
+ outputStream.write(bytes, 0, len)
+ }
+
+ /** Open the file output stream */
+ protected def openFile() {
+ outputStream = new FileOutputStream(file, true)
+ logDebug(s"Opened file $file")
+ }
+
+ /** Close the file output stream */
+ protected def closeFile() {
+ outputStream.flush()
+ outputStream.close()
+ logDebug(s"Closed file $file")
+ }
+}
+
+/**
+ * Companion object to [[org.apache.log4j.FileAppender]] which has helper
+ * functions to choose the write type of FileAppender based on SparkConf
configuration.
+ */
+private[spark] object FileAppender extends Logging {
+
+ /** Create the right appender based on Spark configuration */
+ def apply(inputStream: InputStream, file: File, conf: SparkConf):
FileAppender = {
+
+ import RollingFileAppender._
+
+ val rolloverSizeOption = conf.getOption(SIZE_PROPERTY)
+ val rolloverIntervalOption = conf.getOption(INTERVAL_PROPERTY)
+
+ (rolloverIntervalOption, rolloverSizeOption) match {
+
+ case (Some(rolloverInterval), Some(rolloverSize)) =>
+ // if both size and interval have been set
+ logWarning(s"Rollover interval [$rolloverInterval] and size
[$rolloverSize] " +
+ s"both set for executor logs, rolling logs not enabled")
+ new FileAppender(inputStream, file)
+
+ case (Some(rolloverInterval), None) => // if interval has been set
+ val validatedParams: Option[(Long, String)] = rolloverInterval
match {
+ case "daily" =>
+ logInfo(s"Rolling executor logs enabled for $file with daily
rolling")
+ Some(24 * 60 * 60 * 1000L, s"--YYYY-MM-dd")
+ case "hourly" =>
+ logInfo(s"Rolling executor logs enabled for $file with hourly
rolling")
+ Some(60 * 60 * 1000L, s"--YYYY-MM-dd--HH")
+ case "minutely" =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling
every minute")
+ Some(60 * 1000L, s"--YYYY-MM-dd--HH-mm")
+ case IntParam(seconds) =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling
$seconds seconds")
+ Some(seconds * 1000L, s"--YYYY-MM-dd--HH-mm-ss")
+ case _ =>
+ logWarning(s"Illegal interval for rolling executor logs
[$rolloverInterval], " +
+ s"rolling logs not enabled")
+ None
+ }
+ validatedParams.map {
+ case (interval, pattern) =>
+ new RollingFileAppender(
+ inputStream, file, new
TimeBasedRollingPolicy(interval),pattern, conf)
+ }.getOrElse {
+ new FileAppender(inputStream, file)
+ }
+
+ case (None, Some(rolloverSize)) => // if size has been set
+ rolloverSize match {
+ case IntParam(bytes) =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling
every $bytes bytes")
+ new RollingFileAppender(inputStream, file, new
SizeBasedRollingPolicy(bytes),
+ s"--YYYY-MM-dd--HH-mm-ss-SSSS", conf)
+ case _ =>
+ logWarning(
+ s"Illegal size for rolling executor logs [$rolloverSize], " +
+ s"rolling logs not enabled")
+ new FileAppender(inputStream, file)
+ }
+
+ case (None, None) => // if neither size nor interval
has been set
+ new FileAppender(inputStream, file)
+ }
+ }
+}
+
+/**
+ * Defines the policy based on which
[[org.apache.spark.util.RollingFileAppender]] will
+ * generate rolling files.
+ */
+private[spark] trait RollingPolicy {
+
+ /** Whether rollover should be initiated at this moment */
+ def shouldRollover(bytesToBeWritten: Long): Boolean
+
+ /** Notify that rollover has occurred */
+ def rolledOver()
+
+ /** Notify that bytes have been written */
+ def bytesWritten(bytes: Long)
+}
+
+/**
+ * Defines a [[org.apache.spark.util.RollingPolicy]] by which files will
be rolled
+ * over at a fixed interval.
+ */
+private[spark] class TimeBasedRollingPolicy(
+ var rolloverIntervalMillis: Long,
+ checkIntervalConstraint: Boolean = true // set to false while testing
+ ) extends RollingPolicy with Logging {
+
+ import TimeBasedRollingPolicy._
+ if (checkIntervalConstraint && rolloverIntervalMillis <
MINIMUM_INTERVAL_SECONDS * 1000L) {
+ logWarning(s"Rolling interval [${rolloverIntervalMillis/1000L}
seconds] is too small. " +
+ s"Setting the interval to the acceptable minimum of
$MINIMUM_INTERVAL_SECONDS seconds.")
+ rolloverIntervalMillis = MINIMUM_INTERVAL_SECONDS * 1000L
+ }
+
+ @volatile private var nextRolloverTime = calculateNextRolloverTime()
+
+ /** Should rollover if current time has exceeded next rollover time */
+ def shouldRollover(bytesToBeWritten: Long): Boolean = {
+ System.currentTimeMillis > nextRolloverTime
+ }
+
+ /** Rollover has occurred, so find the next time to rollover */
+ def rolledOver() {
+ nextRolloverTime = calculateNextRolloverTime()
+ logDebug(s"Current time: ${System.currentTimeMillis}, next rollover
time: " + nextRolloverTime)
+ }
+
+ def bytesWritten(bytes: Long) { } // nothing to do
+
+ private def calculateNextRolloverTime(): Long = {
+ val now = System.currentTimeMillis()
+ val targetTime = (
+ math.ceil(now.toDouble / rolloverIntervalMillis) *
rolloverIntervalMillis
+ ).toLong
+ logDebug(s"Next rollover time is $targetTime")
+ targetTime
+ }
+}
+
+private[spark] object TimeBasedRollingPolicy {
+ val MINIMUM_INTERVAL_SECONDS = 60L // 1 minute
+}
+
+/**
+ * Defines a [[org.apache.spark.util.RollingPolicy]] by which files will
be rolled
+ * over after reaching a particular size.
+ */
+private[spark] class SizeBasedRollingPolicy(
+ var rolloverSizeBytes: Long,
+ checkSizeConstraint: Boolean = true // set to false while testing
+ ) extends RollingPolicy with Logging {
+
+ import SizeBasedRollingPolicy._
+ if (checkSizeConstraint && rolloverSizeBytes < MINIMUM_SIZE_BYTES) {
+ logWarning(s"Rolling size [$rolloverSizeBytes bytes] is too small. " +
+ s"Setting the size to the acceptable minimum of $MINIMUM_SIZE_BYTES
bytes.")
+ rolloverSizeBytes = MINIMUM_SIZE_BYTES * 1000L
+ }
+
+ @volatile private var bytesWrittenTillNow = 0L
+
+ /** Should rollover if the next set of bytes is going to exceed the size
limit */
+ def shouldRollover(bytesToBeWritten: Long): Boolean = {
+ bytesToBeWritten + bytesWrittenTillNow > rolloverSizeBytes
+ }
+
+ /** Rollover has occurred, so reset the counter */
+ def rolledOver() {
+ bytesWrittenTillNow = 0
+ }
+
+ /** Increment the bytes that have been written in the current file */
+ def bytesWritten(bytes: Long) {
+ bytesWrittenTillNow += bytes
+ }
+}
+
+private[spark] object SizeBasedRollingPolicy {
+ val MINIMUM_SIZE_BYTES = RollingFileAppender.DEFAULT_BUFFER_SIZE * 10
+}
+
+
+/**
+ * Continuously appends data from input stream into the given file, and
rolls
+ * over the file after the given interval. The rolled over files are named
+ * based on the given pattern.
+ *
+ * @param inputStream Input stream to read data from
+ * @param activeFile File to write data to
+ * @param rollingPolicy Policy based on which files will be
rolled over.
+ * @param rollingFilePattern Pattern based on which the rolled over
files will be named.
+ * Uses SimpleDataFormat pattern.
+ * @param conf SparkConf that is used to pass on extra
configurations
+ * @param bufferSize Optional buffer size. Used mainly for
testing.
+ */
+private[spark] class RollingFileAppender(
+ inputStream: InputStream,
+ activeFile: File,
+ val rollingPolicy: RollingPolicy,
+ rollingFilePattern: String,
+ conf: SparkConf,
+ bufferSize: Int = DEFAULT_BUFFER_SIZE
+ ) extends FileAppender(inputStream, activeFile, bufferSize) {
+
+ private val retainCount = conf.getInt(KEEP_LAST_PROPERTY, -1)
+ private val formatter = new SimpleDateFormat(rollingFilePattern)
+
+ private val executor = Executors.newFixedThreadPool(1, new ThreadFactory{
+ def newThread(r: Runnable): Thread = {
+ val t = new Thread(r)
+ t.setDaemon(true)
+ t.setName(s"Threadpool of
${RollingFileAppender.this.getClass.getSimpleName} for $activeFile")
+ t
+ }
+ })
+
+ /** Stop the appender */
+ override def stop() {
+ super.stop()
+ executor.shutdownNow()
+ }
+
+ /** Append bytes to file after rolling over is necessary */
+ override protected def appendToFile(bytes: Array[Byte], len: Int) {
+ if (rollingPolicy.shouldRollover(len)) {
+ rollover()
+ rollingPolicy.rolledOver()
+ }
+ super.appendToFile(bytes, len)
+ rollingPolicy.bytesWritten(len)
+ }
+
+ /** Rollover the file, by closing the output stream and moving it over */
+ private def rollover() {
+ val rolloverSuffix = formatter.format(Calendar.getInstance.getTime)
+ val rolloverFile = new File(
+ activeFile.getParentFile, activeFile.getName +
rolloverSuffix).getAbsoluteFile
+ logDebug("Attempting to rollover at " + System.currentTimeMillis + "
to file " + rolloverFile)
+
+ try {
+ closeFile()
+ if (activeFile.exists) {
+ if (!rolloverFile.exists) {
+ FileUtils.moveFile(activeFile, rolloverFile)
+ logInfo(s"Rolled over $activeFile to $rolloverFile")
+ } else {
+ // In case the rollover file name clashes, make a unique file
name.
--- End diff --
What about just appending the millisecond or nanosecond time stamp to the
end of the file name then? That would simplify the rollover suffix.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---