vanzin commented on a change in pull request #25670: [SPARK-28869][CORE] Roll 
over event log files
URL: https://github.com/apache/spark/pull/25670#discussion_r328745613
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 ##########
 @@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, IOException}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+import scala.io.Source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.EventLogTestHelper._
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
+
+
+abstract class EventLogFileWritersSuite extends SparkFunSuite with 
LocalSparkContext
+  with BeforeAndAfter {
+
+  protected val fileSystem = Utils.getHadoopFileSystem("/",
+    SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+  protected var testDir: File = _
+  protected var testDirPath: Path = _
+
+  before {
+    testDir = Utils.createTempDir(namePrefix = s"event log")
+    testDir.deleteOnExit()
+    testDirPath = new Path(testDir.getAbsolutePath())
+  }
+
+  after {
+    Utils.deleteRecursively(testDir)
+  }
+
+  test("create EventLogFileWriter with enable/disable rolling") {
+    def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = 
{
+      val writer = EventLogFileWriter(
+        getUniqueApplicationId, None, testDirPath.toUri, conf,
+        SparkHadoopUtil.get.newConfiguration(conf))
+      val writerClazz = writer.getClass
+      assert(expectedClazz === writerClazz,
+        s"default file writer should be $expectedClazz, but $writerClazz")
+    }
+
+    val conf = new SparkConf
+    conf.set(EVENT_LOG_ENABLED, true)
+    conf.set(EVENT_LOG_DIR, testDir.toString)
+
+    // default config
+    buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+
+    conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+    buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter])
+
+    conf.set(EVENT_LOG_ENABLE_ROLLING, false)
+    buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter])
+  }
+
+  val allCodecs = Seq(None) ++
+    CompressionCodec.ALL_COMPRESSION_CODECS.map(c => 
Some(CompressionCodec.getShortName(c)))
+
+  allCodecs.foreach { codecShortName =>
+    test(s"initialize, write, stop - with codec $codecShortName") {
+      val appId = getUniqueApplicationId
+      val attemptId = None
+
+      val conf = getLoggingConf(testDirPath, codecShortName)
+      val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
+        SparkHadoopUtil.get.newConfiguration(conf))
+
+      writer.start()
+
+      // snappy stream throws exception on empty stream, so we should provide 
some data to test.
+      val dummyData = Seq("dummy1", "dummy2", "dummy3")
+      dummyData.foreach(writer.writeEvent(_, flushLogger = true))
+
+      verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+        isCompleted = false, dummyData)
+
+      writer.stop()
+
+      verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, 
codecShortName,
+        isCompleted = true, dummyData)
+    }
+  }
+
+  test("spark.eventLog.compression.codec overrides 
spark.io.compression.codec") {
+    val conf = new SparkConf
+    conf.set(EVENT_LOG_COMPRESS, true)
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+    val appId = "test"
+    val appAttemptId = None
+
+    // The default value is `spark.io.compression.codec`.
+    val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+    assert(writer.compressionCodecName.contains("lz4"))
+
+    // `spark.eventLog.compression.codec` overrides 
`spark.io.compression.codec`.
+    conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
+    val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, 
hadoopConf)
+    assert(writer2.compressionCodecName.contains("zstd"))
+  }
+
+  protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): 
List[String] = {
+    val logDataStream = EventLogFileReader.openEventLog(log, fs)
+    try {
+      Source.fromInputStream(logDataStream).getLines().toList
+    } finally {
+      logDataStream.close()
+    }
+  }
+
+  protected def createWriter(
+      appId: String,
+      appAttemptId : Option[String],
+      logBaseDir: URI,
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): EventLogFileWriter
+
+  protected def verifyWriteEventLogFile(
+      appId: String,
+      appAttemptId : Option[String],
+      logBaseDir: URI,
+      compressionCodecShortName: Option[String],
+      isCompleted: Boolean,
+      expectedLines: Seq[String] = Seq.empty): Unit
+
+  protected def skipVerifyEventLogFile(
+      compressionCodecShortName: Option[String],
+      isCompleted: Boolean): Boolean = {
+    // Spark initializes LZ4BlockOutputStream with syncFlush=false, so we 
can't force
 
 Review comment:
   > Maybe we would be better to make problematic compressions as unsupported?
   
   That's not what I mean. The SHS does the best it can; what I'm saying is 
that these tests being added that try to read open files aren't really helpful 
since they depend on specific behavior of code that is outside of Spark's 
control - namely, compression codecs.
   
   So the test really should only be asserting that data is available in the 
file after the file is closed; before that, there is no guarantee that data has 
actually made it to the file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to