Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/1222#discussion_r15478644
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -54,44 +57,92 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing",
false)
private val outputBufferSize =
sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir",
DEFAULT_LOG_DIR).stripSuffix("/")
- private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" +
System.currentTimeMillis
- val logDir = logBaseDir + "/" + name
+ private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir))
+ private lazy val compressionCodec =
CompressionCodec.createCodec(sparkConf)
+
+ // Only defined if the file system scheme is not local
+ private var hadoopDataStream: Option[FSDataOutputStream] = None
- protected val logger = new FileLogger(logDir, sparkConf, hadoopConf,
outputBufferSize,
- shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
+ private var writer: Option[PrintWriter] = None
// For testing. Keep track of all JSON serialized events that have been
logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
+ val logPath = {
+ val sb = new StringBuilder()
+ .append(logBaseDir)
+ .append("/")
+ .append(appName.replaceAll("[ :/]", "-").toLowerCase())
+ .append("-")
+ .append(System.currentTimeMillis())
+ .append("-")
+ .append(SparkContext.SPARK_VERSION)
+ if (shouldCompress) {
+ val codec =
+ sparkConf.get("spark.io.compression.codec",
CompressionCodec.DEFAULT_COMPRESSION_CODEC)
+ sb.append("-").append(codec)
+ }
+ sb.toString()
+ }
+
/**
- * Begin logging events.
- * If compression is used, log a file that indicates which compression
library is used.
+ * Creates the log file in the configured log directory.
+ *
+ * The file name contains some metadata about its contents. It follows
the following
+ * format:
+ *
+ * {{{
+ * {app name}-{timestamp}.{spark version}[.{compression
codec}][.inprogress]
+ * }}}
+ *
+ * Where:
+ * - "app name" is a fs-friendly version of the application's name, in
lower case
+ * - "timestamp" is a timestamp generated by this logger
+ * - "spark version" is the version of spark that generated the logs
+ * - "compression codec" is an optional string with the name of the
compression codec
+ * used to write the file
+ * - ".inprogress" will be present while the log file is still being
written to, and
+ * removed after the application is finished.
*/
def start() {
- logger.start()
- logInfo("Logging events to %s".format(logDir))
- if (shouldCompress) {
- val codec =
- sparkConf.get("spark.io.compression.codec",
CompressionCodec.DEFAULT_COMPRESSION_CODEC)
- logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
+ if (!fileSystem.isDirectory(new Path(logBaseDir))) {
+ throw new IllegalArgumentException(s"Log directory $logBaseDir does
not exist.");
}
- logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
- logger.newFile(LOG_PREFIX + logger.fileIndex)
+
+ val workingPath = logPath + IN_PROGRESS
+ val uri = new URI(workingPath)
+ val path = new Path(workingPath)
+ val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+ val isDefaultLocal = defaultFs == null || defaultFs == "file"
+
+ /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+ val dstream =
+ if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme ==
"file") {
+ // Second parameter is whether to append
+ new FileOutputStream(uri.getPath)
+ } else {
+ hadoopDataStream = Some(fileSystem.create(path))
+ hadoopDataStream.get
+ }
+
+ fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
+ val bstream = new BufferedOutputStream(dstream, outputBufferSize)
+ val cstream = if (shouldCompress)
compressionCodec.compressedOutputStream(bstream) else bstream
+ writer = Some(new PrintWriter(cstream))
+
+ logInfo("Logging events to %s".format(logPath))
}
/** Log the event as JSON. */
- private def logEvent(event: SparkListenerEvent, flushLogger: Boolean =
false) {
+ private def logEvent(event: SparkListenerEvent) {
val eventJson = JsonProtocol.sparkEventToJson(event)
- logger.logLine(compact(render(eventJson)))
- if (flushLogger) {
- logger.flush()
- }
--- End diff --
Not sure if I understand why this is removed. With these changes, when do
we flush the logs?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---