[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2020-02-11 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r378049590
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -179,6 +179,21 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rolling.enabled")
+  .doc("Whether rolling over event log files is enabled.  If set to true, 
it cuts down " +
+"each event log file to the configured size.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLING_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rolling.maxFileSize")
+  .doc("The max size of event log file to be rolled over.")
 
 Review comment:
   @HyukjinKwon 
   Thanks for initiating the thread in dev mailing list. I'm following up the 
thread and will be back once we get some sort of consensus.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2020-02-11 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r377862998
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -179,6 +179,21 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rolling.enabled")
+  .doc("Whether rolling over event log files is enabled.  If set to true, 
it cuts down " +
+"each event log file to the configured size.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLING_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rolling.maxFileSize")
+  .doc("The max size of event log file to be rolled over.")
 
 Review comment:
   Sorry. Looks like we'll have to agree to disagree then. No one has privilege 
to make someone do the work under his/her authorship which he/she disagrees 
with - it will end up putting wrong authorship on commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2020-01-30 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r373350420
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -179,6 +179,21 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rolling.enabled")
+  .doc("Whether rolling over event log files is enabled.  If set to true, 
it cuts down " +
+"each event log file to the configured size.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLING_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rolling.maxFileSize")
+  .doc("The max size of event log file to be rolled over.")
 
 Review comment:
   I think there're counter examples in Spark configurations which rely on the 
fact once there's a configuration `.enabled`, others are effective only when 
that is enabled.
   
   Even we only check from the SHS configuration, `spark.history.fs.cleaner.*`, 
`spark.history.kerberos.*`, `spark.history.ui.acls.*` fall into the case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-15 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r335249806
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+// The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+// Therefore, for local files, use FileOutputStream instead.
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedContinuousOutputStream(dstream))
+.getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = Some(fnSetupWriter(bstream))
+} catch {
+  case e: Exception =>
+ 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-14 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r334662511
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -179,6 +179,19 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.logRolling.enabled")
+  .doc("Whether rolling over event log files is enabled.  If set to true, 
it cuts down " +
+"each event log file to the configured size.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.logRolling.maxFileSize")
+  .doc("The max size of event log file to be rolled over.")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("128m")
 
 Review comment:
   Thanks for sharing info!
   
   I'll comment about possible case of desiring to set max file size smaller: 
assuming we go with the plan - we would want to set up max number of event log 
files to retain, to address issue where storage quota is limited. (I believe I 
saw this requirement - limited storage quota - earlier in somewhere but can't 
remember.) In that case, "max file size" and "max number of event log files" 
work together to forecast the size of event log dir for the app (snapshot files 
should be considered so in reality it's going to be bigger than that), and to 
control the value finely, we may want to have smaller max file size. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-14 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r334658321
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -179,6 +179,19 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.logRolling.enabled")
+  .doc("Whether rolling over event log files is enabled.  If set to true, 
it cuts down " +
+"each event log file to the configured size.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
 
 Review comment:
   I'll apply former first, as we may need to hear the another voice for 
changing config name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-14 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r334657834
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -179,6 +179,19 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.logRolling.enabled")
 
 Review comment:
   That's a good suggestion as it seems to sound redundant for using log twice. 
As @vanzin left review comment for this, would be nice to hear his thought as 
well before making change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-08 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r332764132
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -699,12 +689,15 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   ((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0)
 
 val bus = new ReplayListenerBus()
-val listener = new AppListingListener(fileStatus, clock, shouldHalt)
+val listener = new AppListingListener(reader, clock, shouldHalt)
 bus.addListener(listener)
 
 logInfo(s"Parsing $logPath for listing data...")
-Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in 
=>
-  bus.replay(in, logPath.toString, !appCompleted, eventsFilter)
+val logFiles = reader.listEventLogFiles
+logFiles.foreach { file =>
+  Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath, fs)) 
{ in =>
+bus.replay(in, file.getPath.toString, !appCompleted, eventsFilter)
 
 Review comment:
   Nice finding! Will address.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-08 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r332746837
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  /** Returns the last index of event log files. None for single event log 
file. */
+  def lastIndex: Option[Long]
+
+  /**
+   * Returns the size of file for the last index of event log files. Returns 
its size for
+   * single event log file.
+   */
+  def fileSizeForLastIndex: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last index of event log files, only when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastIndexForDFS: Option[Long]
+
+  /** Returns the modification time for the last index of event log files. */
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-08 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r332746767
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  /** Returns the last index of event log files. None for single event log 
file. */
+  def lastIndex: Option[Long]
+
+  /**
+   * Returns the size of file for the last index of event log files. Returns 
its size for
+   * single event log file.
+   */
+  def fileSizeForLastIndex: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last index of event log files, only when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastIndexForDFS: Option[Long]
+
+  /** Returns the modification time for the last index of event log files. */
 
 Review comment:
   It seems to be better to add how "the last index" is applied for single 
event log file, as it's not easy to apply this suggestion to 
`fileSizeForLastIndexForDFS`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-08 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r332741144
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.deploy.history.RollingEventLogFilesWriter._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/", 
SparkHadoopUtil.get.conf)
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("Retrieve EventLogFileReader correctly") {
+def assertInstanceOfEventLogReader(
+expectedClazz: Option[Class[_ <: EventLogFileReader]],
+actual: Option[EventLogFileReader]): Unit = {
+  if (expectedClazz.isEmpty) {
+assert(actual.isEmpty, s"Expected no EventLogFileReader instance but 
was " +
+  s"${actual.map(_.getClass).getOrElse("")}")
+  } else {
+assert(actual.isDefined, s"Expected an EventLogFileReader instance but 
was empty")
+assert(expectedClazz.get.isAssignableFrom(actual.get.getClass),
+  s"Expected ${expectedClazz.get} but was ${actual.get.getClass}")
+  }
+}
+
+def testCreateEventLogReaderWithPath(
+path: Path,
+isFile: Boolean,
+expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = {
+  if (isFile) {
+Utils.tryWithResource(fileSystem.create(path)) { is =>
+  is.writeInt(10)
+}
+  } else {
+fileSystem.mkdirs(path)
+  }
+
+  val reader = EventLogFileReader(fileSystem, path)
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+  val reader2 = EventLogFileReader(fileSystem,
+fileSystem.getFileStatus(path))
+  assertInstanceOfEventLogReader(expectedClazz, reader2)
+}
+
+// path with no last index - single event log
+val reader1 = EventLogFileReader(fileSystem, new Path(testDirPath, "aaa"),
+  None)
+
assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), 
Some(reader1))
+
+// path with last index - rolling event log
+val reader2 = EventLogFileReader(fileSystem,
+  new Path(testDirPath, s"${EVENT_LOG_DIR_NAME_PREFIX}aaa"), Some(3))
+
assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), 
Some(reader2))
+
+// path - file (both path and FileStatus)
+val eventLogFile = new Path(testDirPath, "bbb")
+testCreateEventLogReaderWithPath(eventLogFile, isFile = true,
+  Some(classOf[SingleFileEventLogFileReader]))
+
+// path - file starting with "."
+val invalidEventLogFile = new Path(testDirPath, ".bbb")
+testCreateEventLogReaderWithPath(invalidEventLogFile, isFile = true, None)
+
+// path - directory with "eventlog_v2_" prefix
+val eventLogDir = new Path(testDirPath, s"${EVENT_LOG_DIR_NAME_PREFIX}ccc")
+testCreateEventLogReaderWithPath(eventLogDir, isFile = false,
+  Some(classOf[RollingEventLogFilesFileReader]))
+
+// path - directory with no "eventlog_v2_" prefix
+val invalidEventLogDir = 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-04 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331724862
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -1165,12 +1161,15 @@ private[history] case class LogInfo(
 logType: LogType.Value,
 appId: Option[String],
 attemptId: Option[String],
-fileSize: Long)
+fileSize: Long,
+lastIndex: Option[Long],
 
 Review comment:
   Good point. In other tests which we have modified something and worried 
about compatibility, we stored the old data and load from there so I thought 
about the same. Your suggestion would be pretty much simpler. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331326938
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -1165,12 +1161,15 @@ private[history] case class LogInfo(
 logType: LogType.Value,
 appId: Option[String],
 attemptId: Option[String],
-fileSize: Long)
+fileSize: Long,
+lastIndex: Option[Long],
 
 Review comment:
   Just realized that FsHistoryProvider does some care on `logType` being 
`null`. I wouldn't expect LogType.Value is null and feel safer to correct the 
value, but given now FsHistoryProvider accesses logType correctly, that's OK. 
I'd prefer to add UT to check the values of field of LogInfo when reading from 
Spark 2.4.4 to make sure we're not missing anything, but it's just me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331314726
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -1165,12 +1161,15 @@ private[history] case class LogInfo(
 logType: LogType.Value,
 appId: Option[String],
 attemptId: Option[String],
-fileSize: Long)
+fileSize: Long,
+lastIndex: Option[Long],
 
 Review comment:
   I've crafted some test code to experiment this: 
   
   ```
   val newLogDir = new 
File("/spark-dist/releases/spark-2.4.4-bin-hadoop2.7/eventlogs")
   val localStoreDir = new 
File("/spark-dist/releases/spark-2.4.4-bin-hadoop2.7/history-dir")
   val newLocalStoreDir = new File(testDir, "localstore")
   newLocalStoreDir.mkdirs()
   FileUtils.copyDirectory(localStoreDir, newLocalStoreDir)
   
   val conf = createTestConf()
   conf.set(HISTORY_LOG_DIR, newLogDir.getAbsolutePath)
   conf.set(LOCAL_STORE_DIR, newLocalStoreDir.getAbsolutePath)
   
   val provider = new FsHistoryProvider(conf)
   provider.checkForLogs()
   ```
   
   Here the directory directly points to the eventlogs/history local store 
directory in Spark 2.4.4 since metadata of listing contains the path and Spark 
ignores the content of local store if metadata doesn't match. If we want to 
include this to new UT, some modification might be needed. We tend to add UT if 
we want to ensure the new change is compatible with old version, so seem to be 
worth to add.
   
   I've set breakpoint to `FsHistoryProvider.checkForLogs`, and observed 
lastIndex is set to `None` (so it seems to be safe), but `logType` is set to 
`null` which we don't expect.
   
   Would we be better to increase `CURRENT_LISTING_VERSION` to deal with this, 
or manually set to `EventLogs` when it's null? Maybe better to deal with new PR 
though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331248214
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+  val writer = EventLogFileWriter(
+getUniqueApplicationId, None, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+  val writerClazz = writer.getClass
+  assert(expectedClazz === writerClazz,
+s"default file writer should be $expectedClazz, but $writerClazz")
+}
+
+val conf = new SparkConf
+conf.set(EVENT_LOG_ENABLED, true)
+conf.set(EVENT_LOG_DIR, testDir.toString)
+
+// default config
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, false)
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+  }
+
+  val allCodecs = Seq(None) ++
+CompressionCodec.ALL_COMPRESSION_CODECS.map(c => 
Some(CompressionCodec.getShortName(c)))
+
+  allCodecs.foreach { codecShortName =>
+test(s"initialize, write, stop - with codec $codecShortName") {
+  val appId = getUniqueApplicationId
+  val attemptId = None
+
+  val conf = getLoggingConf(testDirPath, codecShortName)
+  val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+
+  writer.start()
+
+  // snappy stream throws exception on empty stream, so we should provide 
some data to test.
+  val dummyData = Seq("dummy1", "dummy2", "dummy3")
+  dummyData.foreach(writer.writeEvent(_, flushLogger = true))
+
+  writer.stop()
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName, dummyData)
+}
+  }
+
+  test("spark.eventLog.compression.codec overrides 
spark.io.compression.codec") {
+val conf = new SparkConf
+conf.set(EVENT_LOG_COMPRESS, true)
+val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+val appId = "test"
+val appAttemptId = None
+
+// The default value is `spark.io.compression.codec`.
+val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+assert(writer.compressionCodecName.contains("lz4"))
+
+// `spark.eventLog.compression.codec` overrides 
`spark.io.compression.codec`.
+conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
+val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+assert(writer2.compressionCodecName.contains("zstd"))
+  }
+
+  protected def readLinesFromEventLogFile(log: Path, fs: 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331247520
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+  val writer = EventLogFileWriter(
+getUniqueApplicationId, None, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+  val writerClazz = writer.getClass
+  assert(expectedClazz === writerClazz,
+s"default file writer should be $expectedClazz, but $writerClazz")
+}
+
+val conf = new SparkConf
+conf.set(EVENT_LOG_ENABLED, true)
+conf.set(EVENT_LOG_DIR, testDir.toString)
+
+// default config
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, false)
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+  }
+
+  val allCodecs = Seq(None) ++
+CompressionCodec.ALL_COMPRESSION_CODECS.map(c => 
Some(CompressionCodec.getShortName(c)))
+
+  allCodecs.foreach { codecShortName =>
+test(s"initialize, write, stop - with codec $codecShortName") {
+  val appId = getUniqueApplicationId
+  val attemptId = None
+
+  val conf = getLoggingConf(testDirPath, codecShortName)
+  val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+
+  writer.start()
+
+  // snappy stream throws exception on empty stream, so we should provide 
some data to test.
+  val dummyData = Seq("dummy1", "dummy2", "dummy3")
+  dummyData.foreach(writer.writeEvent(_, flushLogger = true))
+
+  writer.stop()
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName, dummyData)
+}
+  }
+
+  test("spark.eventLog.compression.codec overrides 
spark.io.compression.codec") {
+val conf = new SparkConf
+conf.set(EVENT_LOG_COMPRESS, true)
+val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+val appId = "test"
+val appAttemptId = None
+
+// The default value is `spark.io.compression.codec`.
+val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+assert(writer.compressionCodecName.contains("lz4"))
+
+// `spark.eventLog.compression.codec` overrides 
`spark.io.compression.codec`.
+conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
+val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+assert(writer2.compressionCodecName.contains("zstd"))
+  }
+
+  protected def readLinesFromEventLogFile(log: Path, fs: 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331245739
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.conf)
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("Retrieve EventLogFileReader correctly") {
+def assertInstanceOfEventLogReader(
+expectedClazz: Option[Class[_ <: EventLogFileReader]],
+actual: Option[EventLogFileReader]): Unit = {
+  if (expectedClazz.isEmpty) {
+assert(actual.isEmpty, s"Expected no EventLogFileReader instance but 
was " +
+  s"${actual.map(_.getClass).getOrElse("")}")
+  } else {
+assert(actual.isDefined, s"Expected an EventLogFileReader instance but 
was empty")
+assert(expectedClazz.get.isAssignableFrom(actual.get.getClass),
+  s"Expected ${expectedClazz.get} but was ${actual.get.getClass}")
+  }
+}
+
+def testForPathWithoutSeq(
+path: Path,
+isFile: Boolean,
+expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = {
+  if (isFile) {
+Utils.tryWithResource(fileSystem.create(path)) { is =>
+  is.writeInt(10)
+}
+  } else {
+fileSystem.mkdirs(path)
+  }
+
+  val reader = EventLogFileReader(fileSystem, path)
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+  val reader2 = EventLogFileReader(fileSystem,
+fileSystem.getFileStatus(path))
+  assertInstanceOfEventLogReader(expectedClazz, reader)
 
 Review comment:
   Nice finding! Will fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331245566
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.conf)
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("Retrieve EventLogFileReader correctly") {
+def assertInstanceOfEventLogReader(
+expectedClazz: Option[Class[_ <: EventLogFileReader]],
+actual: Option[EventLogFileReader]): Unit = {
+  if (expectedClazz.isEmpty) {
+assert(actual.isEmpty, s"Expected no EventLogFileReader instance but 
was " +
+  s"${actual.map(_.getClass).getOrElse("")}")
+  } else {
+assert(actual.isDefined, s"Expected an EventLogFileReader instance but 
was empty")
+assert(expectedClazz.get.isAssignableFrom(actual.get.getClass),
+  s"Expected ${expectedClazz.get} but was ${actual.get.getClass}")
+  }
+}
+
+def testForPathWithoutSeq(
+path: Path,
+isFile: Boolean,
+expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = {
+  if (isFile) {
+Utils.tryWithResource(fileSystem.create(path)) { is =>
+  is.writeInt(10)
+}
+  } else {
+fileSystem.mkdirs(path)
+  }
+
+  val reader = EventLogFileReader(fileSystem, path)
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+  val reader2 = EventLogFileReader(fileSystem,
+fileSystem.getFileStatus(path))
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+}
+
+// path with no last index - single event log
+val reader1 = EventLogFileReader(fileSystem, new Path(testDirPath, "aaa"),
+  None)
+
assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), 
Some(reader1))
+
+// path with last index - rolling event log
+val reader2 = EventLogFileReader(fileSystem,
+  new Path(testDirPath, "eventlog_v2_aaa"), Some(3))
+
assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), 
Some(reader2))
 
 Review comment:
   It is testing the functionality when last index is given as parameter as 
well, so the comment is correct.
   
   > Also would be good if we can reuse EVENT_LOG_DIR_NAME_PREFIX instead of 
eventlog_v2_ here.
   
   Will change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331244512
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -179,6 +179,18 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.logRolling.enabled")
+  .doc("Whether rolling over event log files is enabled.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.logRolling.maxFileSize")
+  .doc("The max size of event log file to be rolled over.")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("128m")
 
 Review comment:
   I'm checking with configuration.md and it only misses one sentence (`If set 
to true, it cuts down each event log file to be configured size.`) in above 
conf. Are you referring other one? Otherwise I'll just add this sentence to doc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331243103
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
 ##
 @@ -17,38 +17,33 @@
 
 package org.apache.spark.scheduler
 
-import java.io._
 import java.net.URI
-import java.nio.charset.StandardCharsets
 
-import scala.collection.mutable.{ArrayBuffer, Map}
+import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
-import org.apache.hadoop.fs.permission.FsPermission
 import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SPARK_VERSION, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogFileWriter
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.{JsonProtocol, Utils}
 
 /**
  * A SparkListener that logs events to persistent storage.
  *
  * Event logging is specified by the following configurable parameters:
  *   spark.eventLog.enabled - Whether event logging is enabled.
- *   spark.eventLog.logBlockUpdates.enabled - Whether to log block updates
- *   spark.eventLog.compress - Whether to compress logged events
- *   spark.eventLog.compression.codec - The codec to compress logged events
- *   spark.eventLog.overwrite - Whether to overwrite any existing files.
  *   spark.eventLog.dir - Path to the directory in which events are logged.
- *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *   spark.eventLog.logBlockUpdates.enabled - Whether to log block updates
  *   spark.eventLog.logStageExecutorMetrics.enabled - Whether to log stage 
executor metrics
+ *
+ * Event log file writer maintains its own parameters: refer the javadoc of 
[[EventLogFileWriter]]
 
 Review comment:
   I'll just change it to `doc`. Shouldn't be a big deal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331242313
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = Some(fnSetupWriter(bstream))
+} catch {
+  case e: Exception =>
+

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-03 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r331241393
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.conf)
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("Retrieve EventLogFileReader correctly") {
+def assertInstanceOfEventLogReader(
+expectedClazz: Option[Class[_ <: EventLogFileReader]],
+actual: Option[EventLogFileReader]): Unit = {
+  if (expectedClazz.isEmpty) {
+assert(actual.isEmpty, s"Expected no EventLogFileReader instance but 
was " +
+  s"${actual.map(_.getClass).getOrElse("")}")
+  } else {
+assert(actual.isDefined, s"Expected an EventLogFileReader instance but 
was empty")
+assert(expectedClazz.get.isAssignableFrom(actual.get.getClass),
+  s"Expected ${expectedClazz.get} but was ${actual.get.getClass}")
+  }
+}
+
+def testForPathWithoutSeq(
 
 Review comment:
   I agree its name is a bit weird. I'll try to rename it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-02 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r330847692
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -1165,12 +1161,15 @@ private[history] case class LogInfo(
 logType: LogType.Value,
 appId: Option[String],
 attemptId: Option[String],
-fileSize: Long)
+fileSize: Long,
+lastIndex: Option[Long],
 
 Review comment:
   I missed to check this. Will check and fix if it causes problem via adding 
UT which loads LevelDB from Spark 2.4.4 and read more applications.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-02 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r330828002
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+  val writer = EventLogFileWriter(
+getUniqueApplicationId, None, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+  val writerClazz = writer.getClass
+  assert(expectedClazz === writerClazz,
+s"default file writer should be $expectedClazz, but $writerClazz")
 
 Review comment:
   Agreed. Will remove hint message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-02 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r330827896
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.conf)
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("Retrieve EventLogFileReader correctly") {
+def assertInstanceOfEventLogReader(
+expectedClazz: Option[Class[_ <: EventLogFileReader]],
+actual: Option[EventLogFileReader]): Unit = {
+  if (expectedClazz.isEmpty) {
+assert(actual.isEmpty, s"Expected no EventLogFileReader instance but 
was " +
+  s"${actual.map(_.getClass).getOrElse("")}")
+  } else {
+assert(actual.isDefined, s"Expected an EventLogFileReader instance but 
was empty")
+assert(expectedClazz.get.isAssignableFrom(actual.get.getClass),
+  s"Expected ${expectedClazz.get} but was ${actual.get.getClass}")
+  }
+}
+
+def testForPathWithoutSeq(
+path: Path,
+isFile: Boolean,
+expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = {
+  if (isFile) {
+Utils.tryWithResource(fileSystem.create(path)) { is =>
+  is.writeInt(10)
+}
+  } else {
+fileSystem.mkdirs(path)
+  }
+
+  val reader = EventLogFileReader(fileSystem, path)
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+  val reader2 = EventLogFileReader(fileSystem,
+fileSystem.getFileStatus(path))
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+}
+
+// path with no last index - single event log
+val reader1 = EventLogFileReader(fileSystem, new Path(testDirPath, "aaa"),
+  None)
+
assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), 
Some(reader1))
+
+// path with last index - rolling event log
+val reader2 = EventLogFileReader(fileSystem,
+  new Path(testDirPath, "eventlog_v2_aaa"), Some(3))
+
assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), 
Some(reader2))
+
+// path - file (both path and FileStatus)
+val eventLogFile = new Path(testDirPath, "bbb")
+testForPathWithoutSeq(eventLogFile, isFile = true, 
Some(classOf[SingleFileEventLogFileReader]))
+
+// path - file starting with "."
+val invalidEventLogFile = new Path(testDirPath, ".bbb")
+testForPathWithoutSeq(invalidEventLogFile, isFile = true, None)
+
+// path - directory with "eventlog_v2_" prefix
+val eventLogDir = new Path(testDirPath, "eventlog_v2_ccc")
+testForPathWithoutSeq(eventLogDir, isFile = false,
+  Some(classOf[RollingEventLogFilesFileReader]))
+
+// path - directory with no "eventlog_v2_" prefix
+val invalidEventLogDir = new Path(testDirPath, "ccc")
+testForPathWithoutSeq(invalidEventLogDir, isFile = false, None)
+  }
+
+  val 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-10-02 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r330806046
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
 
 Review comment:
   That's a good point. Maybe we haven't defined proper behavior of this case: 
would we want to fail the application?
   
   There might be similar case for if shouldOverwrite is true and 
fileSystem.delete() returns false. I see fileSystem.delete() will mostly throw 
IOException when it fails to delete, but at least in javadoc, having 'false' as 
return value when calling fileSystem.delete() may not only say the file doesn't 
exist. Javadoc doesn't guarantee that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-26 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328814572
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = Some(fnSetupWriter(bstream))
+} catch {
+  case e: Exception =>
+

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-26 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328814131
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -758,12 +752,15 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 invalidateUI(app.info.id, app.attempts.head.info.attemptId)
 addListing(app)
 listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, 
Some(app.info.id),
-  app.attempts.head.info.attemptId, fileStatus.getLen()))
+  app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
+  reader.lastIndex, reader.completed))
 
 // For a finished log, remove the corresponding "in progress" entry 
from the listing DB if
 // the file is really gone.
-if (appCompleted) {
-  val inProgressLog = logPath.toString() + 
EventLoggingListener.IN_PROGRESS
+// The logic is only valid for single event log, as root path doesn't 
change for
+// rolled event logs.
+if (appCompleted && reader.lastIndex.isDefined) {
 
 Review comment:
   Nice finding! Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-26 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328813125
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  /** Returns the last index of event log files. None for single event log 
file. */
+  def lastIndex: Option[Long]
+
+  /**
+   * Returns the size of file for the last index of event log files. Returns 
its size for
+   * single event log file.
+   */
+  def fileSizeForLastIndex: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last index of event log files, only when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastIndexForDFS: Option[Long]
+
+  /** Returns the modification time for the last sequence number of event log 
files. */
+  def modificationTime: Long
+
+  /**
+   * This method compresses the files passed in, and writes the compressed 
data out into the
+   * ZipOutputStream passed in. Each file is written as a new ZipEntry with 
its name being
+   * the name of the file being compressed.
+   */
+  def zipEventLogFiles(zipStream: ZipOutputStream): Unit
+
+  /** Returns all available event log files. */
+  def listEventLogFiles: Seq[FileStatus]
+
+  /** Returns the short compression name if being used. None if it's 
uncompressed. */
+  def compressionCodec: Option[String]
+
+  /** Returns the size of all event log files. */
+  def totalSize: Long
+}
+
+object EventLogFileReader {
+  // A cache for compression codecs to avoid creating the same codec many times
+  private val codecMap = new ConcurrentHashMap[String, CompressionCodec]()
+
+  def apply(
+  fs: FileSystem,
+  path: Path,
+  lastIndex: Option[Long]): EventLogFileReader = {
+lastIndex match {
+  case Some(_) => new RollingEventLogFilesFileReader(fs, path)
+  case None => new SingleFileEventLogFileReader(fs, path)
+}
+  }
+
+  def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = {
+apply(fs, fs.getFileStatus(path))
+  }
+
+  def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = {
+if (isSingleEventLog(status)) {
+  Some(new SingleFileEventLogFileReader(fs, status.getPath))
+} else if (isRollingEventLogs(status)) {
+  Some(new RollingEventLogFilesFileReader(fs, status.getPath))
+} else {
+  None
+}
+  }
+
+  /**
+   * Opens an event log file and returns an input stream that contains the 
event data.
+   *
+   * @return input stream that holds one JSON record per line.
+   */
+  def openEventLog(log: Path, fs: FileSystem): InputStream = {
+val in = new 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-26 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328807870
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  /** Returns the last index of event log files. None for single event log 
file. */
+  def lastIndex: Option[Long]
+
+  /**
+   * Returns the size of file for the last index of event log files. Returns 
its size for
+   * single event log file.
+   */
+  def fileSizeForLastIndex: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last index of event log files, only when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastIndexForDFS: Option[Long]
+
+  /** Returns the modification time for the last sequence number of event log 
files. */
+  def modificationTime: Long
+
+  /**
+   * This method compresses the files passed in, and writes the compressed 
data out into the
+   * ZipOutputStream passed in. Each file is written as a new ZipEntry with 
its name being
+   * the name of the file being compressed.
+   */
+  def zipEventLogFiles(zipStream: ZipOutputStream): Unit
+
+  /** Returns all available event log files. */
+  def listEventLogFiles: Seq[FileStatus]
+
+  /** Returns the short compression name if being used. None if it's 
uncompressed. */
+  def compressionCodec: Option[String]
+
+  /** Returns the size of all event log files. */
+  def totalSize: Long
+}
+
+object EventLogFileReader {
+  // A cache for compression codecs to avoid creating the same codec many times
+  private val codecMap = new ConcurrentHashMap[String, CompressionCodec]()
+
+  def apply(
+  fs: FileSystem,
+  path: Path,
+  lastIndex: Option[Long]): EventLogFileReader = {
+lastIndex match {
+  case Some(_) => new RollingEventLogFilesFileReader(fs, path)
+  case None => new SingleFileEventLogFileReader(fs, path)
+}
+  }
+
+  def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = {
+apply(fs, fs.getFileStatus(path))
+  }
+
+  def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = {
+if (isSingleEventLog(status)) {
+  Some(new SingleFileEventLogFileReader(fs, status.getPath))
+} else if (isRollingEventLogs(status)) {
+  Some(new RollingEventLogFilesFileReader(fs, status.getPath))
+} else {
+  None
+}
+  }
+
+  /**
+   * Opens an event log file and returns an input stream that contains the 
event data.
+   *
+   * @return input stream that holds one JSON record per line.
+   */
+  def openEventLog(log: Path, fs: FileSystem): InputStream = {
+val in = new 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-26 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328807369
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  /** Returns the last index of event log files. None for single event log 
file. */
+  def lastIndex: Option[Long]
+
+  /**
+   * Returns the size of file for the last index of event log files. Returns 
its size for
+   * single event log file.
+   */
+  def fileSizeForLastIndex: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last index of event log files, only when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastIndexForDFS: Option[Long]
+
+  /** Returns the modification time for the last sequence number of event log 
files. */
 
 Review comment:
   I guess I've searched but there're missing spots. Thanks for finding!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-26 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328806999
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+  val writer = EventLogFileWriter(
+getUniqueApplicationId, None, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+  val writerClazz = writer.getClass
+  assert(expectedClazz === writerClazz,
+s"default file writer should be $expectedClazz, but $writerClazz")
+}
+
+val conf = new SparkConf
+conf.set(EVENT_LOG_ENABLED, true)
+conf.set(EVENT_LOG_DIR, testDir.toString)
+
+// default config
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, false)
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+  }
+
+  val allCodecs = Seq(None) ++
+CompressionCodec.ALL_COMPRESSION_CODECS.map(c => 
Some(CompressionCodec.getShortName(c)))
+
+  allCodecs.foreach { codecShortName =>
+test(s"initialize, write, stop - with codec $codecShortName") {
+  val appId = getUniqueApplicationId
+  val attemptId = None
+
+  val conf = getLoggingConf(testDirPath, codecShortName)
+  val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+
+  writer.start()
+
+  // snappy stream throws exception on empty stream, so we should provide 
some data to test.
+  val dummyData = Seq("dummy1", "dummy2", "dummy3")
+  dummyData.foreach(writer.writeEvent(_, flushLogger = true))
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+isCompleted = false, dummyData)
+
+  writer.stop()
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+isCompleted = true, dummyData)
+}
+  }
+
+  test("spark.eventLog.compression.codec overrides 
spark.io.compression.codec") {
+val conf = new SparkConf
+conf.set(EVENT_LOG_COMPRESS, true)
+val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+val appId = "test"
+val appAttemptId = None
+
+// The default value is `spark.io.compression.codec`.
+val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+assert(writer.compressionCodecName.contains("lz4"))
+
+// `spark.eventLog.compression.codec` overrides 
`spark.io.compression.codec`.
+conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
+val writer2 = createWriter(appId, appAttemptId, 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-26 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328403300
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+  val writer = EventLogFileWriter(
+getUniqueApplicationId, None, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+  val writerClazz = writer.getClass
+  assert(expectedClazz === writerClazz,
+s"default file writer should be $expectedClazz, but $writerClazz")
+}
+
+val conf = new SparkConf
+conf.set(EVENT_LOG_ENABLED, true)
+conf.set(EVENT_LOG_DIR, testDir.toString)
+
+// default config
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, false)
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+  }
+
+  val allCodecs = Seq(None) ++
+CompressionCodec.ALL_COMPRESSION_CODECS.map(c => 
Some(CompressionCodec.getShortName(c)))
+
+  allCodecs.foreach { codecShortName =>
+test(s"initialize, write, stop - with codec $codecShortName") {
+  val appId = getUniqueApplicationId
+  val attemptId = None
+
+  val conf = getLoggingConf(testDirPath, codecShortName)
+  val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+
+  writer.start()
+
+  // snappy stream throws exception on empty stream, so we should provide 
some data to test.
+  val dummyData = Seq("dummy1", "dummy2", "dummy3")
+  dummyData.foreach(writer.writeEvent(_, flushLogger = true))
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+isCompleted = false, dummyData)
+
+  writer.stop()
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+isCompleted = true, dummyData)
+}
+  }
+
+  test("spark.eventLog.compression.codec overrides 
spark.io.compression.codec") {
+val conf = new SparkConf
+conf.set(EVENT_LOG_COMPRESS, true)
+val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+val appId = "test"
+val appAttemptId = None
+
+// The default value is `spark.io.compression.codec`.
+val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+assert(writer.compressionCodecName.contains("lz4"))
+
+// `spark.eventLog.compression.codec` overrides 
`spark.io.compression.codec`.
+conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
+val writer2 = createWriter(appId, appAttemptId, 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328401775
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
 
 Review comment:
   ~~I'll change it to delete the directory from `after`.~~ It was there, sorry 
my bad. I'll just remove the line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328407564
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
 
 Review comment:
   That doesn't seem to be exactly same, as SparkConf in 
SparkHadoopUtil.get.conf would be initialized with `new 
SparkConf(false).loadFromSystemProperties(true)` whereas new SparkConf() is 
equivalent to `new SparkConf(true)`. ReplayListenerSuite also uses this line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328405270
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path, fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328403300
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+  val writer = EventLogFileWriter(
+getUniqueApplicationId, None, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+  val writerClazz = writer.getClass
+  assert(expectedClazz === writerClazz,
+s"default file writer should be $expectedClazz, but $writerClazz")
+}
+
+val conf = new SparkConf
+conf.set(EVENT_LOG_ENABLED, true)
+conf.set(EVENT_LOG_DIR, testDir.toString)
+
+// default config
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, false)
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+  }
+
+  val allCodecs = Seq(None) ++
+CompressionCodec.ALL_COMPRESSION_CODECS.map(c => 
Some(CompressionCodec.getShortName(c)))
+
+  allCodecs.foreach { codecShortName =>
+test(s"initialize, write, stop - with codec $codecShortName") {
+  val appId = getUniqueApplicationId
+  val attemptId = None
+
+  val conf = getLoggingConf(testDirPath, codecShortName)
+  val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+
+  writer.start()
+
+  // snappy stream throws exception on empty stream, so we should provide 
some data to test.
+  val dummyData = Seq("dummy1", "dummy2", "dummy3")
+  dummyData.foreach(writer.writeEvent(_, flushLogger = true))
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+isCompleted = false, dummyData)
+
+  writer.stop()
+
+  verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+isCompleted = true, dummyData)
+}
+  }
+
+  test("spark.eventLog.compression.codec overrides 
spark.io.compression.codec") {
+val conf = new SparkConf
+conf.set(EVENT_LOG_COMPRESS, true)
+val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+val appId = "test"
+val appAttemptId = None
+
+// The default value is `spark.io.compression.codec`.
+val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+assert(writer.compressionCodecName.contains("lz4"))
+
+// `spark.eventLog.compression.codec` overrides 
`spark.io.compression.codec`.
+conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
+val writer2 = createWriter(appId, appAttemptId, 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328401775
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
 
 Review comment:
   I'll change it to delete the directory from `after`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328401676
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
 
 Review comment:
   This is intentional to test the behavior where it has space or special char 
in directory name. Similarly FileSource/FileSink Suites also test this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328401390
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -574,22 +576,27 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
-  private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): 
Boolean = {
-var result = info.fileSize < entry.getLen
-if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
-  try {
-result = Utils.tryWithResource(fs.open(entry.getPath)) { in =>
-  in.getWrappedStream match {
-case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength
-case _ => false
-  }
+  private[history] def shouldReloadLog(info: LogInfo, reader: 
EventLogFileReader): Boolean = {
+if (info.isComplete != reader.completed) {
+  true
+} else {
+  var result = if (info.lastSequenceNum.isDefined) {
+require(reader.lastIndex.isDefined)
+info.lastSequenceNum.get < reader.lastIndex.get ||
 
 Review comment:
   Missed one. Will rename to lastIndex.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328401321
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path, fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328401018
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path, fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328401018
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path, fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328400815
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  // Only defined if the file system scheme is not local
+  protected var hadoopDataStream: Option[FSDataOutputStream] = None
+  protected var writer: Option[PrintWriter] = None
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path, fnSetupWriter: OutputStream => 
PrintWriter): Unit = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  writer = 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328400238
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  /** Returns the last index of event log files. None for single event log 
file. */
+  def lastIndex: Option[Long]
+
+  /**
+   * Returns the size of file for the last index of event log files. Returns 
its size for
+   * single event log file.
+   */
+  def fileSizeForLastIndex: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last index of event log files, only when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastIndexForDFS: Option[Long]
+
+  /** Returns the modification time for the last sequence number of event log 
files. */
+  def modificationTime: Long
+
+  /**
+   * This method compresses the files passed in, and writes the compressed 
data out into the
+   * ZipOutputStream passed in. Each file is written as a new ZipEntry with 
its name being
+   * the name of the file being compressed.
+   */
+  def zipEventLogFiles(zipStream: ZipOutputStream): Unit
+
+  /** Returns all available event log files. */
+  def listEventLogFiles: Seq[FileStatus]
+
+  /** Returns the short compression name if being used. None if it's 
uncompressed. */
+  def compressionCodec: Option[String]
+
+  /** Returns the size of all event log files. */
+  def totalSize: Long
+}
+
+object EventLogFileReader {
+  // A cache for compression codecs to avoid creating the same codec many times
+  private val codecMap = mutable.HashMap.empty[String, CompressionCodec]
+
+  def apply(
+  fs: FileSystem,
+  path: Path,
+  lastSequenceNumber: Option[Long]): EventLogFileReader = {
+lastSequenceNumber match {
+  case Some(_) => new RollingEventLogFilesFileReader(fs, path)
+  case None => new SingleFileEventLogFileReader(fs, path)
+}
+  }
+
+  def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = {
+apply(fs, fs.getFileStatus(path))
+  }
+
+  def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = {
+if (isSingleEventLog(status)) {
+  Some(new SingleFileEventLogFileReader(fs, status.getPath))
+} else if (isRollingEventLogs(status)) {
+  Some(new RollingEventLogFilesFileReader(fs, status.getPath))
+} else {
+  None
+}
+  }
+
+  /**
+   * Opens an event log file and returns an input stream that contains the 
event data.
+   *
+   * @return input stream that holds one JSON record per line.
+   */
+  def openEventLog(log: Path, fs: FileSystem): InputStream = {
+val in = new 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328101052
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream], 
OutputStream) = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  (hadoopDataStream, bstream)
+} catch {
+  case e: Exception =>
+dstream.close()
+throw e
+}
+  }
+
+  protected def 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328101176
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream], 
OutputStream) = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  (hadoopDataStream, bstream)
+} catch {
+  case e: Exception =>
+dstream.close()
+throw e
+}
+  }
+
+  protected def 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-25 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328085700
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[history] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream], 
OutputStream) = {
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(
+  SparkHadoopUtil.createFile(fileSystem, path, 
sparkConf.get(EVENT_LOG_ALLOW_EC)))
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+  (hadoopDataStream, bstream)
+} catch {
+  case e: Exception =>
+dstream.close()
+throw e
+}
+  }
+
+  protected def 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-18 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325925937
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325450908
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.scheduler.EventLogTestHelper._
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+  val writer = EventLogFileWriter.createEventLogFileWriter(
+getUniqueApplicationId, None, testDirPath.toUri, conf,
+SparkHadoopUtil.get.newConfiguration(conf))
+  val writerClazz = writer.getClass
+  assert(expectedClazz === writerClazz,
+s"default file writer should be $expectedClazz, but $writerClazz")
+}
+
+val conf = new SparkConf
+conf.set(EVENT_LOG_ENABLED, true)
+conf.set(EVENT_LOG_DIR, testDir.toString)
+
+// default config
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter])
+
+conf.set(EVENT_LOG_ENABLE_ROLLING, false)
+buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+  }
+
+  val allCodecs = Seq(None) ++
 
 Review comment:
   Same answer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325450885
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/EventLogFileWritersSuite.scala
 ##
 @@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.scheduler.EventLogTestHelper._
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
 
 Review comment:
   Ah I added it for testing and didn't remove it. Will remove.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325450559
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/EventLogFileReadersSuite.scala
 ##
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.zip.{ZipInputStream, ZipOutputStream}
+
+import scala.collection.mutable
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.scheduler.EventLogTestHelper._
+import org.apache.spark.util.Utils
+
+abstract class EventLogFileReadersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter with Logging {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+testDir = Utils.createTempDir(namePrefix = s"event log")
+testDir.deleteOnExit()
+testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+Utils.deleteRecursively(testDir)
+  }
+
+  test("Retrieve EventLogFileReader correctly") {
+def assertInstanceOfEventLogReader(
+expectedClazz: Option[Class[_ <: EventLogFileReader]],
+actual: Option[EventLogFileReader]): Unit = {
+  if (expectedClazz.isEmpty) {
+assert(actual.isEmpty, s"Expected no EventLogFileReader instance but 
was " +
+  s"${actual.map(_.getClass).getOrElse("")}")
+  } else {
+assert(actual.isDefined, s"Expected an EventLogFileReader instance but 
was empty")
+assert(expectedClazz.get.isAssignableFrom(actual.get.getClass),
+  s"Expected ${expectedClazz.get} but was ${actual.get.getClass}")
+  }
+}
+
+def testForPathWithoutSeq(
+path: Path,
+isFile: Boolean,
+expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = {
+  if (isFile) {
+Utils.tryWithResource(fileSystem.create(path)) { is =>
+  is.writeInt(10)
+}
+  } else {
+fileSystem.mkdirs(path)
+  }
+
+  val reader = EventLogFileReader.getEventLogReader(fileSystem, path)
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+  val reader2 = EventLogFileReader.getEventLogReader(fileSystem,
+fileSystem.getFileStatus(path))
+  assertInstanceOfEventLogReader(expectedClazz, reader)
+}
+
+// path with no last sequence - single event log
+val reader1 = EventLogFileReader.getEventLogReader(fileSystem, new 
Path(testDirPath, "aaa"),
+  None)
+
assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), 
Some(reader1))
+
+// path with last sequence - rolling event log
+val reader2 = EventLogFileReader.getEventLogReader(fileSystem,
+  new Path(testDirPath, "eventlog_v2_aaa"), Some(3))
+
assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), 
Some(reader2))
+
+// path - file (both path and FileStatus)
+val eventLogFile = new Path(testDirPath, "bbb")
+testForPathWithoutSeq(eventLogFile, isFile = true, 
Some(classOf[SingleFileEventLogFileReader]))
+
+// path - file starting with "."
+val invalidEventLogFile = new Path(testDirPath, ".bbb")
+testForPathWithoutSeq(invalidEventLogFile, isFile = true, None)
+
+// path - directory with "eventlog_v2_" prefix
+val eventLogDir = new Path(testDirPath, "eventlog_v2_ccc")
+testForPathWithoutSeq(eventLogDir, isFile = false,
+  Some(classOf[RollingEventLogFilesFileReader]))
+
+// path - directory with no "eventlog_v2_" prefix
+val invalidEventLogDir 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325450098
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325449621
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325449099
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325448554
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325448204
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325447837
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325446292
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileWriters.scala
 ##
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.net.URI
+
+import scala.collection.mutable.Map
+
+import org.apache.commons.compress.utils.CountingOutputStream
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+/**
+ * The base class of writer which will write event logs into file.
+ *
+ * The following configurable parameters are available to tune the behavior of 
writing:
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
+ *
+ * Note that descendant classes can maintain its own parameters: refer the 
javadoc of each class
+ * for more details.
+ *
+ * NOTE: CountingOutputStream being returned by "initLogFile" counts 
"non-compressed" bytes.
+ */
+abstract class EventLogFileWriter(
+appId: String,
+appAttemptId : Option[String],
+logBaseDir: URI,
+sparkConf: SparkConf,
+hadoopConf: Configuration) extends Logging {
+
+  protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  protected val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
+  protected val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
+  protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
+  protected val compressionCodec =
+if (shouldCompress) {
+  Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
+} else {
+  None
+}
+
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
+CompressionCodec.getShortName(c.getClass.getName)
+  }
+
+  protected def requireLogBaseDirAsDirectory(): Unit = {
+if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
+  throw new IllegalArgumentException(s"Log directory $logBaseDir is not a 
directory.")
+}
+  }
+
+  protected def initLogFile(path: Path): (Option[FSDataOutputStream],
+Option[CountingOutputStream]) = {
+
+if (shouldOverwrite && fileSystem.delete(path, true)) {
+  logWarning(s"Event log $path already exists. Overwriting...")
+}
+
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == "file"
+val uri = path.toUri
+
+var hadoopDataStream: Option[FSDataOutputStream] = None
+/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
+ * Therefore, for local files, use FileOutputStream instead. */
+val dstream =
+  if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
+new FileOutputStream(uri.getPath)
+  } else {
+hadoopDataStream = Some(if (shouldAllowECLogs) {
+  fileSystem.create(path)
+} else {
+  SparkHadoopUtil.createNonECFile(fileSystem, path)
+})
+hadoopDataStream.get
+  }
+
+try {
+  val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+  val bstream = new BufferedOutputStream(cstream, outputBufferSize)
+  val ostream = new CountingOutputStream(bstream)
+  fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
+  logInfo(s"Logging events to $path")
+
+  (hadoopDataStream, Some(ostream))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325443715
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.mutable.Map
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.scheduler.EventLogFileWriter.codecName
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  //  methods to be override 
+
+  /** Returns the last sequence of event log files. None for single event log 
file. */
+  def lastSequence: Option[Long]
+
+  /**
+   * Returns the size of file for the last sequence of event log files. 
Returns its size for
+   * single event log file.
+   */
+  def fileSizeForLastSequence: Long
+
+  /** Returns whether the application is completed. */
+  def completed: Boolean
+
+  /**
+   * Returns the size of file for the last sequence of event log files, only 
when
+   * underlying input stream is DFSInputStream. Otherwise returns None.
+   */
+  def fileSizeForLastSequenceForDFS: Option[Long]
+
+  /** Returns the modification time for the last sequence of event log files. 
*/
+  def modificationTime: Long
+
+  /**
+   * This method compresses the files passed in, and writes the compressed 
data out into the
+   * ZipOutputStream passed in. Each file is written as a new ZipEntry with 
its name being
+   * the name of the file being compressed.
+   */
+  def zipEventLogFiles(zipStream: ZipOutputStream): Unit
+
+  /** Returns all available event log files. */
+  def listEventLogFiles: Seq[FileStatus]
+
+  /** Returns the short compression name if being used. None if it's 
uncompressed. */
+  def compression: Option[String]
+
+  /** Returns the size of all event log files. */
+  def allSize: Long
+}
+
+object EventLogFileReader {
+  // A cache for compression codecs to avoid creating the same codec many times
+  private val codecMap = Map.empty[String, CompressionCodec]
+
+  def getEventLogReader(
+  fs: FileSystem,
+  path: Path,
+  lastSequence: Option[Long]): EventLogFileReader = {
+lastSequence match {
+  case Some(_) => new RollingEventLogFilesFileReader(fs, path)
+  case None => new SingleFileEventLogFileReader(fs, path)
+}
+  }
+
+  def getEventLogReader(fs: FileSystem, path: Path): 
Option[EventLogFileReader] = {
+val status = fs.getFileStatus(path)
+if (isSingleEventLog(status)) {
+  Some(new SingleFileEventLogFileReader(fs, path))
+} else if (isRollingEventLogs(status)) {
+  Some(new RollingEventLogFilesFileReader(fs, path))
+} else {
+  None
+}
+  }
+
+  def getEventLogReader(fs: FileSystem, status: FileStatus): 
Option[EventLogFileReader] = {
+if (isSingleEventLog(status)) {
+  Some(new SingleFileEventLogFileReader(fs, status.getPath))
+} else if (isRollingEventLogs(status)) {
+  Some(new RollingEventLogFilesFileReader(fs, status.getPath))

[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325443041
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
 
 Review comment:
   I’ll pick existing one for now, and change altogether if the new package 
looks better. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325426190
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/EventLogFileReaders.scala
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{BufferedInputStream, InputStream}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.mutable.Map
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DFSInputStream
+
+import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.scheduler.EventLogFileWriter.codecName
+import org.apache.spark.util.Utils
+
+/** The base class of reader which will read the information of event log 
file(s). */
+abstract class EventLogFileReader(
+protected val fileSystem: FileSystem,
+val rootPath: Path) {
+
+  protected def fileSizeForDFS(path: Path): Option[Long] = {
+Utils.tryWithResource(fileSystem.open(path)) { in =>
+  in.getWrappedStream match {
+case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
+case _ => None
+  }
+}
+  }
+
+  protected def addFileAsZipEntry(
+  zipStream: ZipOutputStream,
+  path: Path,
+  entryName: String): Unit = {
+Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
+  zipStream.putNextEntry(new ZipEntry(entryName))
+  ByteStreams.copy(inputStream, zipStream)
+  zipStream.closeEntry()
+}
+  }
+
+  //  methods to be override 
 
 Review comment:
   Just removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325426067
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -174,6 +174,17 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rollLog")
+  .doc("Whether rolling over event log files is enabled.")
+  .booleanConf.createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rollLog.maxFileSize")
+  .doc("The max size of event log file to be rolled over, in KiB unless 
otherwise specified.")
 
 Review comment:
   OK got it. That makes sense if we always want users to provide the unit. 
Thanks for explaning!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325426067
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -174,6 +174,17 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rollLog")
+  .doc("Whether rolling over event log files is enabled.")
+  .booleanConf.createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rollLog.maxFileSize")
+  .doc("The max size of event log file to be rolled over, in KiB unless 
otherwise specified.")
 
 Review comment:
   OK got it. That makes sense if we always want users to provide the unit. 
Thanks for explaining!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325425816
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -174,6 +174,17 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rollLog")
+  .doc("Whether rolling over event log files is enabled.")
+  .booleanConf.createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rollLog.maxFileSize")
+  .doc("The max size of event log file to be rolled over, in KiB unless 
otherwise specified.")
+  .bytesConf(ByteUnit.KiB)
+  .createWithDefaultString("10m")
 
 Review comment:
   I guess making files smaller would help cleaning up carefully - as we are 
dealing with some of GBs of log file (some cases tens of, but I haven't seen 
hundreds of). But I also agree default HDFS block size like 128m wouldn't be 
too huge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325423670
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -174,6 +174,17 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rollLog")
+  .doc("Whether rolling over event log files is enabled.")
+  .booleanConf.createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rollLog.maxFileSize")
+  .doc("The max size of event log file to be rolled over, in KiB unless 
otherwise specified.")
 
 Review comment:
   Uh, I'm not sure this has been respected, please search `"in [a-zA-Z]+ 
unless otherwise specified."` with regex. Most of conf docs using bytesConf 
mention this. Would they need to be changed as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325423670
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -174,6 +174,17 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rollLog")
+  .doc("Whether rolling over event log files is enabled.")
+  .booleanConf.createWithDefault(false)
+
+  private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
+ConfigBuilder("spark.eventLog.rollLog.maxFileSize")
+  .doc("The max size of event log file to be rolled over, in KiB unless 
otherwise specified.")
 
 Review comment:
   Uh, I'm not sure this has been respected, please search "in [a-zA-Z]+ unless 
otherwise specified." with regex. Most of conf docs using bytesConf mention 
this. Would they need to be changed as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325422491
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -174,6 +174,17 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rollLog")
 
 Review comment:
   I can see pros and cons regarding combining options: if we combine two 
options into one, we couldn't provide default max size when enabled, because we 
decided to turn this off by default (so default value should be 0 then). I'll 
leave options as they are: please let me know if you feel OK to leave default 
value to be 0, I'll combine both.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325422491
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -174,6 +174,17 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
 
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ENABLE_ROLLING =
+ConfigBuilder("spark.eventLog.rollLog")
 
 Review comment:
   I can see pros and cons regarding combining options: if we combine two 
options into one, we couldn't provide default max size when enabled, because we 
decided to turn this off by default (so default value should be 0 then). I'll 
leave options as they are, and if you feel OK to leave default value to be 0, 
I'll combine both.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] Roll over event log files

2019-09-17 Thread GitBox
HeartSaVioR commented on a change in pull request #25670: [SPARK-28869][CORE] 
Roll over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r325420920
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -1161,12 +1156,15 @@ private[history] case class LogInfo(
 logType: LogType.Value,
 appId: Option[String],
 attemptId: Option[String],
-fileSize: Long)
+fileSize: Long,
+lastSequence: Option[Long],
 
 Review comment:
   `lastSequenceNum` looks OK as the patch has been using the term `sequence`. 
Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org