Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/9143#discussion_r44240066
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
---
@@ -190,283 +281,150 @@ class WriteAheadLogSuite extends SparkFunSuite with
BeforeAndAfter {
}
reader.close()
}
+}
- test("FileBasedWriteAheadLog - write rotating logs") {
- // Write data with rotation using WriteAheadLog class
- val dataToWrite = generateRandomData()
- writeDataUsingWriteAheadLog(testDir, dataToWrite)
+abstract class CloseFileAfterWriteTests(allowBatching: Boolean, testTag:
String)
+ extends CommonWriteAheadLogTests(allowBatching, closeFileAfterWrite =
true, testTag) {
- // Read data manually to verify the written data
- val logFiles = getLogFilesInDirectory(testDir)
- assert(logFiles.size > 1)
- val writtenData = logFiles.flatMap { file => readDataManually(file)}
- assert(writtenData === dataToWrite)
- }
-
- test("FileBasedWriteAheadLog - close after write flag") {
+ import WriteAheadLogSuite._
+ test(testPrefix + "close after write flag") {
// Write data with rotation using WriteAheadLog class
val numFiles = 3
val dataToWrite = Seq.tabulate(numFiles)(_.toString)
// total advance time is less than 1000, therefore log shouldn't be
rolled, but manually closed
writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false,
clockAdvanceTime = 100,
- closeFileAfterWrite = true)
+ closeFileAfterWrite = true, allowBatching = allowBatching)
// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size === numFiles)
- val writtenData = logFiles.flatMap { file => readDataManually(file)}
+ val writtenData: Seq[String] =
readAndDeserializeDataManually(logFiles, allowBatching)
assert(writtenData === dataToWrite)
}
+}
- test("FileBasedWriteAheadLog - read rotating logs") {
- // Write data manually for testing reading through WriteAheadLog
- val writtenData = (1 to 10).map { i =>
- val data = generateRandomData()
- val file = testDir + s"/log-$i-$i"
- writeDataManually(data, file)
- data
- }.flatten
-
- val logDirectoryPath = new Path(testDir)
- val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath,
hadoopConf)
- assert(fileSystem.exists(logDirectoryPath) === true)
+class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
+ extends CloseFileAfterWriteTests(allowBatching = false,
"FileBasedWriteAheadLog")
- // Read data using manager and verify
- val readData = readDataUsingWriteAheadLog(testDir)
- assert(readData === writtenData)
- }
+class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
+ allowBatching = true,
+ closeFileAfterWrite = false,
+ "BatchedWriteAheadLog") with MockitoSugar with BeforeAndAfterEach with
PrivateMethodTester {
- test("FileBasedWriteAheadLog - recover past logs when creating new
manager") {
- // Write data with manager, recover with new manager and verify
- val dataToWrite = generateRandomData()
- writeDataUsingWriteAheadLog(testDir, dataToWrite)
- val logFiles = getLogFilesInDirectory(testDir)
- assert(logFiles.size > 1)
- val readData = readDataUsingWriteAheadLog(testDir)
- assert(dataToWrite === readData)
- }
+ import BatchedWriteAheadLog._
+ import WriteAheadLogSuite._
- test("FileBasedWriteAheadLog - clean old logs") {
- logCleanUpTest(waitForCompletion = false)
- }
+ private var fileBasedWAL: FileBasedWriteAheadLog = _
+ private var walHandle: FileBasedWriteAheadLogSegment = _
+ private var walBatchingThreadPool: ExecutionContextExecutorService = _
+ private val sparkConf = new SparkConf()
- test("FileBasedWriteAheadLog - clean old logs synchronously") {
- logCleanUpTest(waitForCompletion = true)
+ override def beforeEach(): Unit = {
+ fileBasedWAL = mock[FileBasedWriteAheadLog]
+ walHandle = mock[FileBasedWriteAheadLogSegment]
--- End diff --
nit: This can be simply `mock[WriteAheadLog]` and
mock[FileBasedWriteAheadLogSegment]`, isnt it?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]