Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/9143#discussion_r43972975
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
---
@@ -102,6 +100,180 @@ class WriteAheadLogSuite extends SparkFunSuite with
BeforeAndAfter {
}
}
+ test("WriteAheadLogUtils - wrap WriteAheadLog in BatchedWriteAheadLog
when batching is enabled") {
+ def getBatchedSparkConf: SparkConf =
+ new
SparkConf().set("spark.streaming.driver.writeAheadLog.allowBatching", "true")
+
+ val emptyConf = getBatchedSparkConf
+ assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched =
true)
+ assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)
+
+ // Verify setting driver WAL class
+ val conf1 =
getBatchedSparkConf.set("spark.streaming.driver.writeAheadLog.class",
+ classOf[MockWriteAheadLog0].getName())
+ assertDriverLogClass[MockWriteAheadLog0](conf1, isBatched = true)
+ assertReceiverLogClass[FileBasedWriteAheadLog](conf1)
+
+ // Verify receivers are not wrapped
+ val receiverWALConf =
getBatchedSparkConf.set("spark.streaming.receiver.writeAheadLog.class",
+ classOf[MockWriteAheadLog0].getName())
+ assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf,
isBatched = true)
+ assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
+ }
+}
+
+/** Common tests for WriteAheadLogs that we would like to test with
different configurations. */
+abstract class CommonWriteAheadLogTests(
+ allowBatching: Boolean,
+ closeFileAfterWrite: Boolean,
+ testTag: String = "")
+ extends SparkFunSuite with BeforeAndAfter {
+
+ import WriteAheadLogSuite._
+
+ val hadoopConf = new Configuration()
+ var tempDir: File = null
+ var testDir: String = null
+ var testFile: String = null
+ var writeAheadLog: WriteAheadLog = null
+ protected def testPrefix = if (testTag != "") testTag + " - " else
testTag
+
+ before {
+ tempDir = Utils.createTempDir()
+ testDir = tempDir.toString
+ testFile = new File(tempDir, "testFile").toString
+ if (writeAheadLog != null) {
+ writeAheadLog.close()
+ writeAheadLog = null
+ }
+ }
+
+ after {
+ Utils.deleteRecursively(tempDir)
+ }
+
+ test(testPrefix + "read rotating logs") {
--- End diff --
offline discussion, call this `readAll`, as all subclasses may not be
*rotating* logs.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]