[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r378049590 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -179,6 +179,21 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rolling.enabled") + .doc("Whether rolling over event log files is enabled. If set to true, it cuts down " + +"each event log file to the configured size.") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLING_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rolling.maxFileSize") + .doc("The max size of event log file to be rolled over.") Review comment: @HyukjinKwon Thanks for initiating the thread in dev mailing list. I'm following up the thread and will be back once we get some sort of consensus. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r377862998 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -179,6 +179,21 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rolling.enabled") + .doc("Whether rolling over event log files is enabled. If set to true, it cuts down " + +"each event log file to the configured size.") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLING_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rolling.maxFileSize") + .doc("The max size of event log file to be rolled over.") Review comment: Sorry. Looks like we'll have to agree to disagree then. No one has privilege to make someone do the work under his/her authorship which he/she disagrees with - it will end up putting wrong authorship on commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r373350420 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -179,6 +179,21 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rolling.enabled") + .doc("Whether rolling over event log files is enabled. If set to true, it cuts down " + +"each event log file to the configured size.") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLING_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rolling.maxFileSize") + .doc("The max size of event log file to be rolled over.") Review comment: I think there're counter examples in Spark configurations which rely on the fact once there's a configuration `.enabled`, others are effective only when that is enabled. Even we only check from the SHS configuration, `spark.history.fs.cleaner.*`, `spark.history.kerberos.*`, `spark.history.ui.acls.*` fall into the case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r335249806 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +// The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). +// Therefore, for local files, use FileOutputStream instead. +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedContinuousOutputStream(dstream)) +.getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer = Some(fnSetupWriter(bstream)) +} catch { + case e: Exception => +
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r334662511 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -179,6 +179,19 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.logRolling.enabled") + .doc("Whether rolling over event log files is enabled. If set to true, it cuts down " + +"each event log file to the configured size.") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.logRolling.maxFileSize") + .doc("The max size of event log file to be rolled over.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("128m") Review comment: Thanks for sharing info! I'll comment about possible case of desiring to set max file size smaller: assuming we go with the plan - we would want to set up max number of event log files to retain, to address issue where storage quota is limited. (I believe I saw this requirement - limited storage quota - earlier in somewhere but can't remember.) In that case, "max file size" and "max number of event log files" work together to forecast the size of event log dir for the app (snapshot files should be considered so in reality it's going to be bigger than that), and to control the value finely, we may want to have smaller max file size. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r334658321 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -179,6 +179,19 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.logRolling.enabled") + .doc("Whether rolling over event log files is enabled. If set to true, it cuts down " + +"each event log file to the configured size.") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = Review comment: I'll apply former first, as we may need to hear the another voice for changing config name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r334657834 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -179,6 +179,19 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.logRolling.enabled") Review comment: That's a good suggestion as it seems to sound redundant for using log twice. As @vanzin left review comment for this, would be nice to hear his thought as well before making change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r332764132 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -699,12 +689,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0) val bus = new ReplayListenerBus() -val listener = new AppListingListener(fileStatus, clock, shouldHalt) +val listener = new AppListingListener(reader, clock, shouldHalt) bus.addListener(listener) logInfo(s"Parsing $logPath for listing data...") -Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in => - bus.replay(in, logPath.toString, !appCompleted, eventsFilter) +val logFiles = reader.listEventLogFiles +logFiles.foreach { file => + Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath, fs)) { in => +bus.replay(in, file.getPath.toString, !appCompleted, eventsFilter) Review comment: Nice finding! Will address. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r332746837 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, InputStream} +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventLogFileWriter.codecName +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + /** Returns the last index of event log files. None for single event log file. */ + def lastIndex: Option[Long] + + /** + * Returns the size of file for the last index of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastIndex: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last index of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastIndexForDFS: Option[Long] + + /** Returns the modification time for the last index of event log files. */ Review comment: Same here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r332746767 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, InputStream} +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventLogFileWriter.codecName +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + /** Returns the last index of event log files. None for single event log file. */ + def lastIndex: Option[Long] + + /** + * Returns the size of file for the last index of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastIndex: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last index of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastIndexForDFS: Option[Long] + + /** Returns the modification time for the last index of event log files. */ Review comment: It seems to be better to add how "the last index" is applied for single event log file, as it's not easy to apply this suggestion to `fileSizeForLastIndexForDFS`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r332741144 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.deploy.history.RollingEventLogFilesWriter._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", SparkHadoopUtil.get.conf) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("Retrieve EventLogFileReader correctly") { +def assertInstanceOfEventLogReader( +expectedClazz: Option[Class[_ <: EventLogFileReader]], +actual: Option[EventLogFileReader]): Unit = { + if (expectedClazz.isEmpty) { +assert(actual.isEmpty, s"Expected no EventLogFileReader instance but was " + + s"${actual.map(_.getClass).getOrElse("")}") + } else { +assert(actual.isDefined, s"Expected an EventLogFileReader instance but was empty") +assert(expectedClazz.get.isAssignableFrom(actual.get.getClass), + s"Expected ${expectedClazz.get} but was ${actual.get.getClass}") + } +} + +def testCreateEventLogReaderWithPath( +path: Path, +isFile: Boolean, +expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = { + if (isFile) { +Utils.tryWithResource(fileSystem.create(path)) { is => + is.writeInt(10) +} + } else { +fileSystem.mkdirs(path) + } + + val reader = EventLogFileReader(fileSystem, path) + assertInstanceOfEventLogReader(expectedClazz, reader) + val reader2 = EventLogFileReader(fileSystem, +fileSystem.getFileStatus(path)) + assertInstanceOfEventLogReader(expectedClazz, reader2) +} + +// path with no last index - single event log +val reader1 = EventLogFileReader(fileSystem, new Path(testDirPath, "aaa"), + None) + assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), Some(reader1)) + +// path with last index - rolling event log +val reader2 = EventLogFileReader(fileSystem, + new Path(testDirPath, s"${EVENT_LOG_DIR_NAME_PREFIX}aaa"), Some(3)) + assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), Some(reader2)) + +// path - file (both path and FileStatus) +val eventLogFile = new Path(testDirPath, "bbb") +testCreateEventLogReaderWithPath(eventLogFile, isFile = true, + Some(classOf[SingleFileEventLogFileReader])) + +// path - file starting with "." +val invalidEventLogFile = new Path(testDirPath, ".bbb") +testCreateEventLogReaderWithPath(invalidEventLogFile, isFile = true, None) + +// path - directory with "eventlog_v2_" prefix +val eventLogDir = new Path(testDirPath, s"${EVENT_LOG_DIR_NAME_PREFIX}ccc") +testCreateEventLogReaderWithPath(eventLogDir, isFile = false, + Some(classOf[RollingEventLogFilesFileReader])) + +// path - directory with no "eventlog_v2_" prefix +val invalidEventLogDir =
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331724862 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -1165,12 +1161,15 @@ private[history] case class LogInfo( logType: LogType.Value, appId: Option[String], attemptId: Option[String], -fileSize: Long) +fileSize: Long, +lastIndex: Option[Long], Review comment: Good point. In other tests which we have modified something and worried about compatibility, we stored the old data and load from there so I thought about the same. Your suggestion would be pretty much simpler. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331326938 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -1165,12 +1161,15 @@ private[history] case class LogInfo( logType: LogType.Value, appId: Option[String], attemptId: Option[String], -fileSize: Long) +fileSize: Long, +lastIndex: Option[Long], Review comment: Just realized that FsHistoryProvider does some care on `logType` being `null`. I wouldn't expect LogType.Value is null and feel safer to correct the value, but given now FsHistoryProvider accesses logType correctly, that's OK. I'd prefer to add UT to check the values of field of LogInfo when reading from Spark 2.4.4 to make sure we're not missing anything, but it's just me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331314726 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -1165,12 +1161,15 @@ private[history] case class LogInfo( logType: LogType.Value, appId: Option[String], attemptId: Option[String], -fileSize: Long) +fileSize: Long, +lastIndex: Option[Long], Review comment: I've crafted some test code to experiment this: ``` val newLogDir = new File("/spark-dist/releases/spark-2.4.4-bin-hadoop2.7/eventlogs") val localStoreDir = new File("/spark-dist/releases/spark-2.4.4-bin-hadoop2.7/history-dir") val newLocalStoreDir = new File(testDir, "localstore") newLocalStoreDir.mkdirs() FileUtils.copyDirectory(localStoreDir, newLocalStoreDir) val conf = createTestConf() conf.set(HISTORY_LOG_DIR, newLogDir.getAbsolutePath) conf.set(LOCAL_STORE_DIR, newLocalStoreDir.getAbsolutePath) val provider = new FsHistoryProvider(conf) provider.checkForLogs() ``` Here the directory directly points to the eventlogs/history local store directory in Spark 2.4.4 since metadata of listing contains the path and Spark ignores the content of local store if metadata doesn't match. If we want to include this to new UT, some modification might be needed. We tend to add UT if we want to ensure the new change is compatible with old version, so seem to be worth to add. I've set breakpoint to `FsHistoryProvider.checkForLogs`, and observed lastIndex is set to `None` (so it seems to be safe), but `logType` is set to `null` which we don't expect. Would we be better to increase `CURRENT_LISTING_VERSION` to deal with this, or manually set to `EventLogs` when it's null? Maybe better to deal with new PR though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331248214 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala ## @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { +def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter( +getUniqueApplicationId, None, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz, +s"default file writer should be $expectedClazz, but $writerClazz") +} + +val conf = new SparkConf +conf.set(EVENT_LOG_ENABLED, true) +conf.set(EVENT_LOG_DIR, testDir.toString) + +// default config +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, true) +buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, false) +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + } + + val allCodecs = Seq(None) ++ +CompressionCodec.ALL_COMPRESSION_CODECS.map(c => Some(CompressionCodec.getShortName(c))) + + allCodecs.foreach { codecShortName => +test(s"initialize, write, stop - with codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // snappy stream throws exception on empty stream, so we should provide some data to test. + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + writer.stop() + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, dummyData) +} + } + + test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { +val conf = new SparkConf +conf.set(EVENT_LOG_COMPRESS, true) +val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + +val appId = "test" +val appAttemptId = None + +// The default value is `spark.io.compression.codec`. +val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) +assert(writer.compressionCodecName.contains("lz4")) + +// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. +conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") +val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) +assert(writer2.compressionCodecName.contains("zstd")) + } + + protected def readLinesFromEventLogFile(log: Path, fs:
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331247520 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala ## @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { +def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter( +getUniqueApplicationId, None, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz, +s"default file writer should be $expectedClazz, but $writerClazz") +} + +val conf = new SparkConf +conf.set(EVENT_LOG_ENABLED, true) +conf.set(EVENT_LOG_DIR, testDir.toString) + +// default config +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, true) +buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, false) +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + } + + val allCodecs = Seq(None) ++ +CompressionCodec.ALL_COMPRESSION_CODECS.map(c => Some(CompressionCodec.getShortName(c))) + + allCodecs.foreach { codecShortName => +test(s"initialize, write, stop - with codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // snappy stream throws exception on empty stream, so we should provide some data to test. + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + writer.stop() + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, dummyData) +} + } + + test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { +val conf = new SparkConf +conf.set(EVENT_LOG_COMPRESS, true) +val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + +val appId = "test" +val appAttemptId = None + +// The default value is `spark.io.compression.codec`. +val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) +assert(writer.compressionCodecName.contains("lz4")) + +// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. +conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") +val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) +assert(writer2.compressionCodecName.contains("zstd")) + } + + protected def readLinesFromEventLogFile(log: Path, fs:
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331245739 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.conf) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("Retrieve EventLogFileReader correctly") { +def assertInstanceOfEventLogReader( +expectedClazz: Option[Class[_ <: EventLogFileReader]], +actual: Option[EventLogFileReader]): Unit = { + if (expectedClazz.isEmpty) { +assert(actual.isEmpty, s"Expected no EventLogFileReader instance but was " + + s"${actual.map(_.getClass).getOrElse("")}") + } else { +assert(actual.isDefined, s"Expected an EventLogFileReader instance but was empty") +assert(expectedClazz.get.isAssignableFrom(actual.get.getClass), + s"Expected ${expectedClazz.get} but was ${actual.get.getClass}") + } +} + +def testForPathWithoutSeq( +path: Path, +isFile: Boolean, +expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = { + if (isFile) { +Utils.tryWithResource(fileSystem.create(path)) { is => + is.writeInt(10) +} + } else { +fileSystem.mkdirs(path) + } + + val reader = EventLogFileReader(fileSystem, path) + assertInstanceOfEventLogReader(expectedClazz, reader) + val reader2 = EventLogFileReader(fileSystem, +fileSystem.getFileStatus(path)) + assertInstanceOfEventLogReader(expectedClazz, reader) Review comment: Nice finding! Will fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331245566 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.conf) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("Retrieve EventLogFileReader correctly") { +def assertInstanceOfEventLogReader( +expectedClazz: Option[Class[_ <: EventLogFileReader]], +actual: Option[EventLogFileReader]): Unit = { + if (expectedClazz.isEmpty) { +assert(actual.isEmpty, s"Expected no EventLogFileReader instance but was " + + s"${actual.map(_.getClass).getOrElse("")}") + } else { +assert(actual.isDefined, s"Expected an EventLogFileReader instance but was empty") +assert(expectedClazz.get.isAssignableFrom(actual.get.getClass), + s"Expected ${expectedClazz.get} but was ${actual.get.getClass}") + } +} + +def testForPathWithoutSeq( +path: Path, +isFile: Boolean, +expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = { + if (isFile) { +Utils.tryWithResource(fileSystem.create(path)) { is => + is.writeInt(10) +} + } else { +fileSystem.mkdirs(path) + } + + val reader = EventLogFileReader(fileSystem, path) + assertInstanceOfEventLogReader(expectedClazz, reader) + val reader2 = EventLogFileReader(fileSystem, +fileSystem.getFileStatus(path)) + assertInstanceOfEventLogReader(expectedClazz, reader) +} + +// path with no last index - single event log +val reader1 = EventLogFileReader(fileSystem, new Path(testDirPath, "aaa"), + None) + assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), Some(reader1)) + +// path with last index - rolling event log +val reader2 = EventLogFileReader(fileSystem, + new Path(testDirPath, "eventlog_v2_aaa"), Some(3)) + assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), Some(reader2)) Review comment: It is testing the functionality when last index is given as parameter as well, so the comment is correct. > Also would be good if we can reuse EVENT_LOG_DIR_NAME_PREFIX instead of eventlog_v2_ here. Will change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331244512 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -179,6 +179,18 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.logRolling.enabled") + .doc("Whether rolling over event log files is enabled.") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.logRolling.maxFileSize") + .doc("The max size of event log file to be rolled over.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("128m") Review comment: I'm checking with configuration.md and it only misses one sentence (`If set to true, it cuts down each event log file to be configured size.`) in above conf. Are you referring other one? Otherwise I'll just add this sentence to doc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331243103 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ## @@ -17,38 +17,33 @@ package org.apache.spark.scheduler -import java.io._ import java.net.URI -import java.nio.charset.StandardCharsets -import scala.collection.mutable.{ArrayBuffer, Map} +import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} -import org.apache.hadoop.fs.permission.FsPermission import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogFileWriter import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** * A SparkListener that logs events to persistent storage. * * Event logging is specified by the following configurable parameters: * spark.eventLog.enabled - Whether event logging is enabled. - * spark.eventLog.logBlockUpdates.enabled - Whether to log block updates - * spark.eventLog.compress - Whether to compress logged events - * spark.eventLog.compression.codec - The codec to compress logged events - * spark.eventLog.overwrite - Whether to overwrite any existing files. * spark.eventLog.dir - Path to the directory in which events are logged. - * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * spark.eventLog.logBlockUpdates.enabled - Whether to log block updates * spark.eventLog.logStageExecutorMetrics.enabled - Whether to log stage executor metrics + * + * Event log file writer maintains its own parameters: refer the javadoc of [[EventLogFileWriter]] Review comment: I'll just change it to `doc`. Shouldn't be a big deal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331242313 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer = Some(fnSetupWriter(bstream)) +} catch { + case e: Exception => +
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r331241393 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.conf) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("Retrieve EventLogFileReader correctly") { +def assertInstanceOfEventLogReader( +expectedClazz: Option[Class[_ <: EventLogFileReader]], +actual: Option[EventLogFileReader]): Unit = { + if (expectedClazz.isEmpty) { +assert(actual.isEmpty, s"Expected no EventLogFileReader instance but was " + + s"${actual.map(_.getClass).getOrElse("")}") + } else { +assert(actual.isDefined, s"Expected an EventLogFileReader instance but was empty") +assert(expectedClazz.get.isAssignableFrom(actual.get.getClass), + s"Expected ${expectedClazz.get} but was ${actual.get.getClass}") + } +} + +def testForPathWithoutSeq( Review comment: I agree its name is a bit weird. I'll try to rename it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r330847692 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -1165,12 +1161,15 @@ private[history] case class LogInfo( logType: LogType.Value, appId: Option[String], attemptId: Option[String], -fileSize: Long) +fileSize: Long, +lastIndex: Option[Long], Review comment: I missed to check this. Will check and fix if it causes problem via adding UT which loads LevelDB from Spark 2.4.4 and read more applications. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r330828002 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala ## @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { +def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter( +getUniqueApplicationId, None, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz, +s"default file writer should be $expectedClazz, but $writerClazz") Review comment: Agreed. Will remove hint message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r330827896 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.conf) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("Retrieve EventLogFileReader correctly") { +def assertInstanceOfEventLogReader( +expectedClazz: Option[Class[_ <: EventLogFileReader]], +actual: Option[EventLogFileReader]): Unit = { + if (expectedClazz.isEmpty) { +assert(actual.isEmpty, s"Expected no EventLogFileReader instance but was " + + s"${actual.map(_.getClass).getOrElse("")}") + } else { +assert(actual.isDefined, s"Expected an EventLogFileReader instance but was empty") +assert(expectedClazz.get.isAssignableFrom(actual.get.getClass), + s"Expected ${expectedClazz.get} but was ${actual.get.getClass}") + } +} + +def testForPathWithoutSeq( +path: Path, +isFile: Boolean, +expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = { + if (isFile) { +Utils.tryWithResource(fileSystem.create(path)) { is => + is.writeInt(10) +} + } else { +fileSystem.mkdirs(path) + } + + val reader = EventLogFileReader(fileSystem, path) + assertInstanceOfEventLogReader(expectedClazz, reader) + val reader2 = EventLogFileReader(fileSystem, +fileSystem.getFileStatus(path)) + assertInstanceOfEventLogReader(expectedClazz, reader) +} + +// path with no last index - single event log +val reader1 = EventLogFileReader(fileSystem, new Path(testDirPath, "aaa"), + None) + assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), Some(reader1)) + +// path with last index - rolling event log +val reader2 = EventLogFileReader(fileSystem, + new Path(testDirPath, "eventlog_v2_aaa"), Some(3)) + assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), Some(reader2)) + +// path - file (both path and FileStatus) +val eventLogFile = new Path(testDirPath, "bbb") +testForPathWithoutSeq(eventLogFile, isFile = true, Some(classOf[SingleFileEventLogFileReader])) + +// path - file starting with "." +val invalidEventLogFile = new Path(testDirPath, ".bbb") +testForPathWithoutSeq(invalidEventLogFile, isFile = true, None) + +// path - directory with "eventlog_v2_" prefix +val eventLogDir = new Path(testDirPath, "eventlog_v2_ccc") +testForPathWithoutSeq(eventLogDir, isFile = false, + Some(classOf[RollingEventLogFilesFileReader])) + +// path - directory with no "eventlog_v2_" prefix +val invalidEventLogDir = new Path(testDirPath, "ccc") +testForPathWithoutSeq(invalidEventLogDir, isFile = false, None) + } + + val
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r330806046 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { Review comment: That's a good point. Maybe we haven't defined proper behavior of this case: would we want to fail the application? There might be similar case for if shouldOverwrite is true and fileSystem.delete() returns false. I see fileSystem.delete() will mostly throw IOException when it fails to delete, but at least in javadoc, having 'false' as return value when calling fileSystem.delete() may not only say the file doesn't exist. Javadoc doesn't guarantee that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328814572 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer = Some(fnSetupWriter(bstream)) +} catch { + case e: Exception => +
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328814131 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -758,12 +752,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) invalidateUI(app.info.id, app.attempts.head.info.attemptId) addListing(app) listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id), - app.attempts.head.info.attemptId, fileStatus.getLen())) + app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, + reader.lastIndex, reader.completed)) // For a finished log, remove the corresponding "in progress" entry from the listing DB if // the file is really gone. -if (appCompleted) { - val inProgressLog = logPath.toString() + EventLoggingListener.IN_PROGRESS +// The logic is only valid for single event log, as root path doesn't change for +// rolled event logs. +if (appCompleted && reader.lastIndex.isDefined) { Review comment: Nice finding! Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328813125 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, InputStream} +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventLogFileWriter.codecName +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + /** Returns the last index of event log files. None for single event log file. */ + def lastIndex: Option[Long] + + /** + * Returns the size of file for the last index of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastIndex: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last index of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastIndexForDFS: Option[Long] + + /** Returns the modification time for the last sequence number of event log files. */ + def modificationTime: Long + + /** + * This method compresses the files passed in, and writes the compressed data out into the + * ZipOutputStream passed in. Each file is written as a new ZipEntry with its name being + * the name of the file being compressed. + */ + def zipEventLogFiles(zipStream: ZipOutputStream): Unit + + /** Returns all available event log files. */ + def listEventLogFiles: Seq[FileStatus] + + /** Returns the short compression name if being used. None if it's uncompressed. */ + def compressionCodec: Option[String] + + /** Returns the size of all event log files. */ + def totalSize: Long +} + +object EventLogFileReader { + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = new ConcurrentHashMap[String, CompressionCodec]() + + def apply( + fs: FileSystem, + path: Path, + lastIndex: Option[Long]): EventLogFileReader = { +lastIndex match { + case Some(_) => new RollingEventLogFilesFileReader(fs, path) + case None => new SingleFileEventLogFileReader(fs, path) +} + } + + def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = { +apply(fs, fs.getFileStatus(path)) + } + + def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = { +if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, status.getPath)) +} else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, status.getPath)) +} else { + None +} + } + + /** + * Opens an event log file and returns an input stream that contains the event data. + * + * @return input stream that holds one JSON record per line. + */ + def openEventLog(log: Path, fs: FileSystem): InputStream = { +val in = new
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328807870 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, InputStream} +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventLogFileWriter.codecName +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + /** Returns the last index of event log files. None for single event log file. */ + def lastIndex: Option[Long] + + /** + * Returns the size of file for the last index of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastIndex: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last index of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastIndexForDFS: Option[Long] + + /** Returns the modification time for the last sequence number of event log files. */ + def modificationTime: Long + + /** + * This method compresses the files passed in, and writes the compressed data out into the + * ZipOutputStream passed in. Each file is written as a new ZipEntry with its name being + * the name of the file being compressed. + */ + def zipEventLogFiles(zipStream: ZipOutputStream): Unit + + /** Returns all available event log files. */ + def listEventLogFiles: Seq[FileStatus] + + /** Returns the short compression name if being used. None if it's uncompressed. */ + def compressionCodec: Option[String] + + /** Returns the size of all event log files. */ + def totalSize: Long +} + +object EventLogFileReader { + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = new ConcurrentHashMap[String, CompressionCodec]() + + def apply( + fs: FileSystem, + path: Path, + lastIndex: Option[Long]): EventLogFileReader = { +lastIndex match { + case Some(_) => new RollingEventLogFilesFileReader(fs, path) + case None => new SingleFileEventLogFileReader(fs, path) +} + } + + def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = { +apply(fs, fs.getFileStatus(path)) + } + + def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = { +if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, status.getPath)) +} else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, status.getPath)) +} else { + None +} + } + + /** + * Opens an event log file and returns an input stream that contains the event data. + * + * @return input stream that holds one JSON record per line. + */ + def openEventLog(log: Path, fs: FileSystem): InputStream = { +val in = new
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328807369 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, InputStream} +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventLogFileWriter.codecName +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + /** Returns the last index of event log files. None for single event log file. */ + def lastIndex: Option[Long] + + /** + * Returns the size of file for the last index of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastIndex: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last index of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastIndexForDFS: Option[Long] + + /** Returns the modification time for the last sequence number of event log files. */ Review comment: I guess I've searched but there're missing spots. Thanks for finding! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328806999 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala ## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { +def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter( +getUniqueApplicationId, None, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz, +s"default file writer should be $expectedClazz, but $writerClazz") +} + +val conf = new SparkConf +conf.set(EVENT_LOG_ENABLED, true) +conf.set(EVENT_LOG_DIR, testDir.toString) + +// default config +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, true) +buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, false) +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + } + + val allCodecs = Seq(None) ++ +CompressionCodec.ALL_COMPRESSION_CODECS.map(c => Some(CompressionCodec.getShortName(c))) + + allCodecs.foreach { codecShortName => +test(s"initialize, write, stop - with codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // snappy stream throws exception on empty stream, so we should provide some data to test. + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, +isCompleted = false, dummyData) + + writer.stop() + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, +isCompleted = true, dummyData) +} + } + + test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { +val conf = new SparkConf +conf.set(EVENT_LOG_COMPRESS, true) +val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + +val appId = "test" +val appAttemptId = None + +// The default value is `spark.io.compression.codec`. +val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) +assert(writer.compressionCodecName.contains("lz4")) + +// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. +conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") +val writer2 = createWriter(appId, appAttemptId,
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328403300 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala ## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { +def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter( +getUniqueApplicationId, None, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz, +s"default file writer should be $expectedClazz, but $writerClazz") +} + +val conf = new SparkConf +conf.set(EVENT_LOG_ENABLED, true) +conf.set(EVENT_LOG_DIR, testDir.toString) + +// default config +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, true) +buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, false) +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + } + + val allCodecs = Seq(None) ++ +CompressionCodec.ALL_COMPRESSION_CODECS.map(c => Some(CompressionCodec.getShortName(c))) + + allCodecs.foreach { codecShortName => +test(s"initialize, write, stop - with codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // snappy stream throws exception on empty stream, so we should provide some data to test. + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, +isCompleted = false, dummyData) + + writer.stop() + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, +isCompleted = true, dummyData) +} + } + + test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { +val conf = new SparkConf +conf.set(EVENT_LOG_COMPRESS, true) +val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + +val appId = "test" +val appAttemptId = None + +// The default value is `spark.io.compression.codec`. +val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) +assert(writer.compressionCodecName.contains("lz4")) + +// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. +conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") +val writer2 = createWriter(appId, appAttemptId,
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328401775 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() Review comment: ~~I'll change it to delete the directory from `after`.~~ It was there, sorry my bad. I'll just remove the line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328407564 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) Review comment: That doesn't seem to be exactly same, as SparkConf in SparkHadoopUtil.get.conf would be initialized with `new SparkConf(false).loadFromSystemProperties(true)` whereas new SparkConf() is equivalent to `new SparkConf(true)`. ReplayListenerSuite also uses this line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328405270 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path, fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer =
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328403300 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala ## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { +def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter( +getUniqueApplicationId, None, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz, +s"default file writer should be $expectedClazz, but $writerClazz") +} + +val conf = new SparkConf +conf.set(EVENT_LOG_ENABLED, true) +conf.set(EVENT_LOG_DIR, testDir.toString) + +// default config +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, true) +buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, false) +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + } + + val allCodecs = Seq(None) ++ +CompressionCodec.ALL_COMPRESSION_CODECS.map(c => Some(CompressionCodec.getShortName(c))) + + allCodecs.foreach { codecShortName => +test(s"initialize, write, stop - with codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // snappy stream throws exception on empty stream, so we should provide some data to test. + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, +isCompleted = false, dummyData) + + writer.stop() + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, +isCompleted = true, dummyData) +} + } + + test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { +val conf = new SparkConf +conf.set(EVENT_LOG_COMPRESS, true) +val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + +val appId = "test" +val appAttemptId = None + +// The default value is `spark.io.compression.codec`. +val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) +assert(writer.compressionCodecName.contains("lz4")) + +// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. +conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") +val writer2 = createWriter(appId, appAttemptId,
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328401775 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() Review comment: I'll change it to delete the directory from `after`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328401676 ## File path: core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala ## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") Review comment: This is intentional to test the behavior where it has space or special char in directory name. Similarly FileSource/FileSink Suites also test this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328401390 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -574,22 +576,27 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): Boolean = { -var result = info.fileSize < entry.getLen -if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { - try { -result = Utils.tryWithResource(fs.open(entry.getPath)) { in => - in.getWrappedStream match { -case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength -case _ => false - } + private[history] def shouldReloadLog(info: LogInfo, reader: EventLogFileReader): Boolean = { +if (info.isComplete != reader.completed) { + true +} else { + var result = if (info.lastSequenceNum.isDefined) { +require(reader.lastIndex.isDefined) +info.lastSequenceNum.get < reader.lastIndex.get || Review comment: Missed one. Will rename to lastIndex. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328401321 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path, fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer =
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328401018 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path, fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer =
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328401018 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path, fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer =
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328400815 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path, fnSetupWriter: OutputStream => PrintWriter): Unit = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer =
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328400238 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, InputStream} +import java.util.zip.{ZipEntry, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventLogFileWriter.codecName +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + /** Returns the last index of event log files. None for single event log file. */ + def lastIndex: Option[Long] + + /** + * Returns the size of file for the last index of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastIndex: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last index of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastIndexForDFS: Option[Long] + + /** Returns the modification time for the last sequence number of event log files. */ + def modificationTime: Long + + /** + * This method compresses the files passed in, and writes the compressed data out into the + * ZipOutputStream passed in. Each file is written as a new ZipEntry with its name being + * the name of the file being compressed. + */ + def zipEventLogFiles(zipStream: ZipOutputStream): Unit + + /** Returns all available event log files. */ + def listEventLogFiles: Seq[FileStatus] + + /** Returns the short compression name if being used. None if it's uncompressed. */ + def compressionCodec: Option[String] + + /** Returns the size of all event log files. */ + def totalSize: Long +} + +object EventLogFileReader { + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = mutable.HashMap.empty[String, CompressionCodec] + + def apply( + fs: FileSystem, + path: Path, + lastSequenceNumber: Option[Long]): EventLogFileReader = { +lastSequenceNumber match { + case Some(_) => new RollingEventLogFilesFileReader(fs, path) + case None => new SingleFileEventLogFileReader(fs, path) +} + } + + def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = { +apply(fs, fs.getFileStatus(path)) + } + + def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = { +if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, status.getPath)) +} else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, status.getPath)) +} else { + None +} + } + + /** + * Opens an event log file and returns an input stream that contains the event data. + * + * @return input stream that holds one JSON record per line. + */ + def openEventLog(log: Path, fs: FileSystem): InputStream = { +val in = new
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328101052 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], OutputStream) = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + (hadoopDataStream, bstream) +} catch { + case e: Exception => +dstream.close() +throw e +} + } + + protected def
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328101176 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], OutputStream) = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + (hadoopDataStream, bstream) +} catch { + case e: Exception => +dstream.close() +throw e +} + } + + protected def
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r328085700 ## File path: core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[history] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], OutputStream) = { +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + (hadoopDataStream, bstream) +} catch { + case e: Exception => +dstream.close() +throw e +} + } + + protected def
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325925937 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325450908 ## File path: core/src/test/scala/org/apache/spark/scheduler/EventLogFileWritersSuite.scala ## @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.scheduler.EventLogTestHelper._ +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { +def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter.createEventLogFileWriter( +getUniqueApplicationId, None, testDirPath.toUri, conf, +SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz, +s"default file writer should be $expectedClazz, but $writerClazz") +} + +val conf = new SparkConf +conf.set(EVENT_LOG_ENABLED, true) +conf.set(EVENT_LOG_DIR, testDir.toString) + +// default config +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, true) +buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter]) + +conf.set(EVENT_LOG_ENABLE_ROLLING, false) +buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + } + + val allCodecs = Seq(None) ++ Review comment: Same answer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325450885 ## File path: core/src/test/scala/org/apache/spark/scheduler/EventLogFileWritersSuite.scala ## @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.scheduler.EventLogTestHelper._ +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { Review comment: Ah I added it for testing and didn't remove it. Will remove. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325450559 ## File path: core/src/test/scala/org/apache/spark/scheduler/EventLogFileReadersSuite.scala ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import scala.collection.mutable + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.scheduler.EventLogTestHelper._ +import org.apache.spark.util.Utils + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", +SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { +testDir = Utils.createTempDir(namePrefix = s"event log") +testDir.deleteOnExit() +testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { +Utils.deleteRecursively(testDir) + } + + test("Retrieve EventLogFileReader correctly") { +def assertInstanceOfEventLogReader( +expectedClazz: Option[Class[_ <: EventLogFileReader]], +actual: Option[EventLogFileReader]): Unit = { + if (expectedClazz.isEmpty) { +assert(actual.isEmpty, s"Expected no EventLogFileReader instance but was " + + s"${actual.map(_.getClass).getOrElse("")}") + } else { +assert(actual.isDefined, s"Expected an EventLogFileReader instance but was empty") +assert(expectedClazz.get.isAssignableFrom(actual.get.getClass), + s"Expected ${expectedClazz.get} but was ${actual.get.getClass}") + } +} + +def testForPathWithoutSeq( +path: Path, +isFile: Boolean, +expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = { + if (isFile) { +Utils.tryWithResource(fileSystem.create(path)) { is => + is.writeInt(10) +} + } else { +fileSystem.mkdirs(path) + } + + val reader = EventLogFileReader.getEventLogReader(fileSystem, path) + assertInstanceOfEventLogReader(expectedClazz, reader) + val reader2 = EventLogFileReader.getEventLogReader(fileSystem, +fileSystem.getFileStatus(path)) + assertInstanceOfEventLogReader(expectedClazz, reader) +} + +// path with no last sequence - single event log +val reader1 = EventLogFileReader.getEventLogReader(fileSystem, new Path(testDirPath, "aaa"), + None) + assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), Some(reader1)) + +// path with last sequence - rolling event log +val reader2 = EventLogFileReader.getEventLogReader(fileSystem, + new Path(testDirPath, "eventlog_v2_aaa"), Some(3)) + assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), Some(reader2)) + +// path - file (both path and FileStatus) +val eventLogFile = new Path(testDirPath, "bbb") +testForPathWithoutSeq(eventLogFile, isFile = true, Some(classOf[SingleFileEventLogFileReader])) + +// path - file starting with "." +val invalidEventLogFile = new Path(testDirPath, ".bbb") +testForPathWithoutSeq(invalidEventLogFile, isFile = true, None) + +// path - directory with "eventlog_v2_" prefix +val eventLogDir = new Path(testDirPath, "eventlog_v2_ccc") +testForPathWithoutSeq(eventLogDir, isFile = false, + Some(classOf[RollingEventLogFilesFileReader])) + +// path - directory with no "eventlog_v2_" prefix +val invalidEventLogDir
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325450098 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325449621 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325449099 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325448554 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325448204 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325447837 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325446292 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala ## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable.Map + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( +appId: String, +appAttemptId : Option[String], +logBaseDir: URI, +sparkConf: SparkConf, +hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = +if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) +} else { + None +} + + private[scheduler] val compressionCodecName = compressionCodec.map { c => +CompressionCodec.getShortName(c.getClass.getName) + } + + protected def requireLogBaseDirAsDirectory(): Unit = { +if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") +} + } + + protected def initLogFile(path: Path): (Option[FSDataOutputStream], +Option[CountingOutputStream]) = { + +if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") +} + +val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme +val isDefaultLocal = defaultFs == null || defaultFs == "file" +val uri = path.toUri + +var hadoopDataStream: Option[FSDataOutputStream] = None +/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ +val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { +new FileOutputStream(uri.getPath) + } else { +hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) +} else { + SparkHadoopUtil.createNonECFile(fileSystem, path) +}) +hadoopDataStream.get + } + +try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + val ostream = new CountingOutputStream(bstream) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + + (hadoopDataStream, Some(ostream))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325443715 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{BufferedInputStream, InputStream} +import java.util.zip.{ZipEntry, ZipOutputStream} + +import scala.collection.mutable.Map + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.io.CompressionCodec +import org.apache.spark.scheduler.EventLogFileWriter.codecName +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + // methods to be override + + /** Returns the last sequence of event log files. None for single event log file. */ + def lastSequence: Option[Long] + + /** + * Returns the size of file for the last sequence of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastSequence: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last sequence of event log files, only when + * underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastSequenceForDFS: Option[Long] + + /** Returns the modification time for the last sequence of event log files. */ + def modificationTime: Long + + /** + * This method compresses the files passed in, and writes the compressed data out into the + * ZipOutputStream passed in. Each file is written as a new ZipEntry with its name being + * the name of the file being compressed. + */ + def zipEventLogFiles(zipStream: ZipOutputStream): Unit + + /** Returns all available event log files. */ + def listEventLogFiles: Seq[FileStatus] + + /** Returns the short compression name if being used. None if it's uncompressed. */ + def compression: Option[String] + + /** Returns the size of all event log files. */ + def allSize: Long +} + +object EventLogFileReader { + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = Map.empty[String, CompressionCodec] + + def getEventLogReader( + fs: FileSystem, + path: Path, + lastSequence: Option[Long]): EventLogFileReader = { +lastSequence match { + case Some(_) => new RollingEventLogFilesFileReader(fs, path) + case None => new SingleFileEventLogFileReader(fs, path) +} + } + + def getEventLogReader(fs: FileSystem, path: Path): Option[EventLogFileReader] = { +val status = fs.getFileStatus(path) +if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, path)) +} else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, path)) +} else { + None +} + } + + def getEventLogReader(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = { +if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, status.getPath)) +} else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, status.getPath))
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325443041 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler Review comment: I’ll pick existing one for now, and change altogether if the new package looks better. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325426190 ## File path: core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{BufferedInputStream, InputStream} +import java.util.zip.{ZipEntry, ZipOutputStream} + +import scala.collection.mutable.Map + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.io.CompressionCodec +import org.apache.spark.scheduler.EventLogFileWriter.codecName +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( +protected val fileSystem: FileSystem, +val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { +Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { +case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) +case _ => None + } +} + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { +Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() +} + } + + // methods to be override Review comment: Just removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325426067 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -174,6 +174,17 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rollLog") + .doc("Whether rolling over event log files is enabled.") + .booleanConf.createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rollLog.maxFileSize") + .doc("The max size of event log file to be rolled over, in KiB unless otherwise specified.") Review comment: OK got it. That makes sense if we always want users to provide the unit. Thanks for explaning! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325426067 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -174,6 +174,17 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rollLog") + .doc("Whether rolling over event log files is enabled.") + .booleanConf.createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rollLog.maxFileSize") + .doc("The max size of event log file to be rolled over, in KiB unless otherwise specified.") Review comment: OK got it. That makes sense if we always want users to provide the unit. Thanks for explaining! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325425816 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -174,6 +174,17 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rollLog") + .doc("Whether rolling over event log files is enabled.") + .booleanConf.createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rollLog.maxFileSize") + .doc("The max size of event log file to be rolled over, in KiB unless otherwise specified.") + .bytesConf(ByteUnit.KiB) + .createWithDefaultString("10m") Review comment: I guess making files smaller would help cleaning up carefully - as we are dealing with some of GBs of log file (some cases tens of, but I haven't seen hundreds of). But I also agree default HDFS block size like 128m wouldn't be too huge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325423670 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -174,6 +174,17 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rollLog") + .doc("Whether rolling over event log files is enabled.") + .booleanConf.createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rollLog.maxFileSize") + .doc("The max size of event log file to be rolled over, in KiB unless otherwise specified.") Review comment: Uh, I'm not sure this has been respected, please search `"in [a-zA-Z]+ unless otherwise specified."` with regex. Most of conf docs using bytesConf mention this. Would they need to be changed as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325423670 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -174,6 +174,17 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rollLog") + .doc("Whether rolling over event log files is enabled.") + .booleanConf.createWithDefault(false) + + private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE = +ConfigBuilder("spark.eventLog.rollLog.maxFileSize") + .doc("The max size of event log file to be rolled over, in KiB unless otherwise specified.") Review comment: Uh, I'm not sure this has been respected, please search "in [a-zA-Z]+ unless otherwise specified." with regex. Most of conf docs using bytesConf mention this. Would they need to be changed as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325422491 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -174,6 +174,17 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rollLog") Review comment: I can see pros and cons regarding combining options: if we combine two options into one, we couldn't provide default max size when enabled, because we decided to turn this off by default (so default value should be 0 then). I'll leave options as they are: please let me know if you feel OK to leave default value to be 0, I'll combine both. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325422491 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -174,6 +174,17 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EVENT_LOG_ENABLE_ROLLING = +ConfigBuilder("spark.eventLog.rollLog") Review comment: I can see pros and cons regarding combining options: if we combine two options into one, we couldn't provide default max size when enabled, because we decided to turn this off by default (so default value should be 0 then). I'll leave options as they are, and if you feel OK to leave default value to be 0, I'll combine both. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files URL: https://github.com/apache/spark/pull/25670#discussion_r325420920 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -1161,12 +1156,15 @@ private[history] case class LogInfo( logType: LogType.Value, appId: Option[String], attemptId: Option[String], -fileSize: Long) +fileSize: Long, +lastSequence: Option[Long], Review comment: `lastSequenceNum` looks OK as the patch has been using the term `sequence`. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org