Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/9143#discussion_r44239889
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
---
@@ -190,283 +281,150 @@ class WriteAheadLogSuite extends SparkFunSuite with
BeforeAndAfter {
}
reader.close()
}
+}
- test("FileBasedWriteAheadLog - write rotating logs") {
- // Write data with rotation using WriteAheadLog class
- val dataToWrite = generateRandomData()
- writeDataUsingWriteAheadLog(testDir, dataToWrite)
+abstract class CloseFileAfterWriteTests(allowBatching: Boolean, testTag:
String)
+ extends CommonWriteAheadLogTests(allowBatching, closeFileAfterWrite =
true, testTag) {
- // Read data manually to verify the written data
- val logFiles = getLogFilesInDirectory(testDir)
- assert(logFiles.size > 1)
- val writtenData = logFiles.flatMap { file => readDataManually(file)}
- assert(writtenData === dataToWrite)
- }
-
- test("FileBasedWriteAheadLog - close after write flag") {
+ import WriteAheadLogSuite._
+ test(testPrefix + "close after write flag") {
// Write data with rotation using WriteAheadLog class
val numFiles = 3
val dataToWrite = Seq.tabulate(numFiles)(_.toString)
// total advance time is less than 1000, therefore log shouldn't be
rolled, but manually closed
writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false,
clockAdvanceTime = 100,
- closeFileAfterWrite = true)
+ closeFileAfterWrite = true, allowBatching = allowBatching)
// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size === numFiles)
- val writtenData = logFiles.flatMap { file => readDataManually(file)}
+ val writtenData: Seq[String] =
readAndDeserializeDataManually(logFiles, allowBatching)
assert(writtenData === dataToWrite)
}
+}
- test("FileBasedWriteAheadLog - read rotating logs") {
- // Write data manually for testing reading through WriteAheadLog
- val writtenData = (1 to 10).map { i =>
- val data = generateRandomData()
- val file = testDir + s"/log-$i-$i"
- writeDataManually(data, file)
- data
- }.flatten
-
- val logDirectoryPath = new Path(testDir)
- val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath,
hadoopConf)
- assert(fileSystem.exists(logDirectoryPath) === true)
+class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
+ extends CloseFileAfterWriteTests(allowBatching = false,
"FileBasedWriteAheadLog")
- // Read data using manager and verify
- val readData = readDataUsingWriteAheadLog(testDir)
- assert(readData === writtenData)
- }
+class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
+ allowBatching = true,
+ closeFileAfterWrite = false,
+ "BatchedWriteAheadLog") with MockitoSugar with BeforeAndAfterEach with
PrivateMethodTester {
- test("FileBasedWriteAheadLog - recover past logs when creating new
manager") {
- // Write data with manager, recover with new manager and verify
- val dataToWrite = generateRandomData()
- writeDataUsingWriteAheadLog(testDir, dataToWrite)
- val logFiles = getLogFilesInDirectory(testDir)
- assert(logFiles.size > 1)
- val readData = readDataUsingWriteAheadLog(testDir)
- assert(dataToWrite === readData)
- }
+ import BatchedWriteAheadLog._
+ import WriteAheadLogSuite._
- test("FileBasedWriteAheadLog - clean old logs") {
- logCleanUpTest(waitForCompletion = false)
- }
+ private var fileBasedWAL: FileBasedWriteAheadLog = _
+ private var walHandle: FileBasedWriteAheadLogSegment = _
+ private var walBatchingThreadPool: ExecutionContextExecutorService = _
+ private val sparkConf = new SparkConf()
- test("FileBasedWriteAheadLog - clean old logs synchronously") {
- logCleanUpTest(waitForCompletion = true)
+ override def beforeEach(): Unit = {
+ fileBasedWAL = mock[FileBasedWriteAheadLog]
+ walHandle = mock[FileBasedWriteAheadLogSegment]
+ walBatchingThreadPool = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(8, "wal-test-thread-pool"))
}
- private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
- // Write data with manager, recover with new manager and verify
- val manualClock = new ManualClock
- val dataToWrite = generateRandomData()
- writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite,
manualClock, closeLog = false)
- val logFiles = getLogFilesInDirectory(testDir)
- assert(logFiles.size > 1)
-
- writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion)
-
- if (waitForCompletion) {
- assert(getLogFilesInDirectory(testDir).size < logFiles.size)
- } else {
- eventually(timeout(1 second), interval(10 milliseconds)) {
- assert(getLogFilesInDirectory(testDir).size < logFiles.size)
- }
+ override def afterEach(): Unit = {
+ if (walBatchingThreadPool != null) {
+ walBatchingThreadPool.shutdownNow()
}
}
- test("FileBasedWriteAheadLog - handling file errors while reading
rotating logs") {
- // Generate a set of log files
- val manualClock = new ManualClock
- val dataToWrite1 = generateRandomData()
- writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock)
- val logFiles1 = getLogFilesInDirectory(testDir)
- assert(logFiles1.size > 1)
-
+ test("BatchedWriteAheadLog - serializing and deserializing batched
records") {
+ val events = Seq(
+ BlockAdditionEvent(ReceivedBlockInfo(0, None, None, null)),
+ BatchAllocationEvent(null, null),
+ BatchCleanupEvent(Nil)
+ )
- // Recover old files and generate a second set of log files
- val dataToWrite2 = generateRandomData()
- manualClock.advance(100000)
- writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock)
- val logFiles2 = getLogFilesInDirectory(testDir)
- assert(logFiles2.size > logFiles1.size)
+ val buffers = events.map(e =>
Record(ByteBuffer.wrap(Utils.serialize(e)), 0L, null))
+ val batched = BatchedWriteAheadLog.aggregate(buffers)
+ val deaggregate = BatchedWriteAheadLog.deaggregate(batched).map(buffer
=>
+ Utils.deserialize[ReceivedBlockTrackerLogEvent](buffer.array()))
- // Read the files and verify that all the written data can be read
- val readData1 = readDataUsingWriteAheadLog(testDir)
- assert(readData1 === (dataToWrite1 ++ dataToWrite2))
-
- // Corrupt the first set of files so that they are basically unreadable
- logFiles1.foreach { f =>
- val raf = new FileOutputStream(f, true).getChannel()
- raf.truncate(1)
- raf.close()
- }
-
- // Verify that the corrupted files do not prevent reading of the
second set of data
- val readData = readDataUsingWriteAheadLog(testDir)
- assert(readData === dataToWrite2)
- }
-
- test("FileBasedWriteAheadLog - do not create directories or files unless
write") {
- val nonexistentTempPath = File.createTempFile("test", "")
- nonexistentTempPath.delete()
- assert(!nonexistentTempPath.exists())
-
- val writtenSegment = writeDataManually(generateRandomData(), testFile)
- val wal = new FileBasedWriteAheadLog(new SparkConf(),
tempDir.getAbsolutePath,
- new Configuration(), 1, 1, closeFileAfterWrite = false)
- assert(!nonexistentTempPath.exists(), "Directory created just by
creating log object")
- wal.read(writtenSegment.head)
- assert(!nonexistentTempPath.exists(), "Directory created just by
attempting to read segment")
+ assert(deaggregate.toSeq === events)
}
-}
-object WriteAheadLogSuite {
+ test("BatchedWriteAheadLog - failures in wrappedLog get bubbled up") {
+ when(fileBasedWAL.write(any[ByteBuffer], anyLong)).thenThrow(new
RuntimeException("Hello!"))
+ // the BatchedWriteAheadLog should bubble up any exceptions that may
have happened during writes
+ val wal = new BatchedWriteAheadLog(fileBasedWAL, sparkConf)
- class MockWriteAheadLog0() extends WriteAheadLog {
- override def write(record: ByteBuffer, time: Long):
WriteAheadLogRecordHandle = { null }
- override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = {
null }
- override def readAll(): util.Iterator[ByteBuffer] = { null }
- override def clean(threshTime: Long, waitForCompletion: Boolean): Unit
= { }
- override def close(): Unit = { }
+ intercept[RuntimeException] {
+ val buffer = mock[ByteBuffer]
+ wal.write(buffer, 2L)
+ }
}
- class MockWriteAheadLog1(val conf: SparkConf) extends
MockWriteAheadLog0()
-
- class MockWriteAheadLog2(val conf: SparkConf, x: Int) extends
MockWriteAheadLog0()
-
-
- private val hadoopConf = new Configuration()
-
- /** Write data to a file directly and return an array of the file
segments written. */
- def writeDataManually(data: Seq[String], file: String):
Seq[FileBasedWriteAheadLogSegment] = {
- val segments = new ArrayBuffer[FileBasedWriteAheadLogSegment]()
- val writer = HdfsUtils.getOutputStream(file, hadoopConf)
- data.foreach { item =>
- val offset = writer.getPos
- val bytes = Utils.serialize(item)
- writer.writeInt(bytes.size)
- writer.write(bytes)
- segments += FileBasedWriteAheadLogSegment(file, offset, bytes.size)
- }
- writer.close()
- segments
+ // we make the write requests in separate threads so that we don't block
the test thread
+ private def eventFuture(
+ wal: WriteAheadLog,
+ event: String,
+ time: Long,
+ numSuccess: AtomicInteger = null,
+ numFail: AtomicInteger = null): Unit = {
+ val f = Future(wal.write(event, time))(walBatchingThreadPool)
+ f.onComplete {
+ case Success(v) =>
+ assert(v === walHandle) // return our mock handle after the write
+ if (numSuccess != null) numSuccess.incrementAndGet()
+ case Failure(v) => if (numFail != null) numFail.incrementAndGet()
+ }(walBatchingThreadPool)
}
/**
- * Write data to a file using the writer class and return an array of
the file segments written.
+ * In order to block the writes on the writer thread, we mock the write
method, and block it
+ * for some time with a promise.
*/
- def writeDataUsingWriter(
- filePath: String,
- data: Seq[String]
- ): Seq[FileBasedWriteAheadLogSegment] = {
- val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
- val segments = data.map {
- item => writer.write(item)
- }
- writer.close()
- segments
- }
-
- /** Write data to rotating files in log directory using the
WriteAheadLog class. */
- def writeDataUsingWriteAheadLog(
- logDirectory: String,
- data: Seq[String],
- manualClock: ManualClock = new ManualClock,
- closeLog: Boolean = true,
- clockAdvanceTime: Int = 500,
- closeFileAfterWrite: Boolean = false): FileBasedWriteAheadLog = {
- if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
- val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory,
hadoopConf, 1, 1,
- closeFileAfterWrite)
-
- // Ensure that 500 does not get sorted after 2000, so put a high base
value.
- data.foreach { item =>
- manualClock.advance(clockAdvanceTime)
- wal.write(item, manualClock.getTimeMillis())
- }
- if (closeLog) wal.close()
- wal
- }
-
- /** Read data from a segments of a log file directly and return the list
of byte buffers. */
- def readDataManually(segments: Seq[FileBasedWriteAheadLogSegment]):
Seq[String] = {
- segments.map { segment =>
- val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)
- try {
- reader.seek(segment.offset)
- val bytes = new Array[Byte](segment.length)
- reader.readInt()
- reader.readFully(bytes)
- val data = Utils.deserialize[String](bytes)
- reader.close()
- data
- } finally {
- reader.close()
+ private def writeBlockingPromise(wal: WriteAheadLog): Promise[Any] = {
+ // we would like to block the write so that we can queue requests
+ val promise = Promise[Any]()
+ when(wal.write(any[ByteBuffer], any[Long])).thenAnswer(
+ new Answer[FileBasedWriteAheadLogSegment] {
+ override def answer(invocation: InvocationOnMock):
FileBasedWriteAheadLogSegment = {
+ Await.ready(promise.future, 4.seconds)
+ walHandle
+ }
}
- }
+ )
+ promise
}
- /** Read all the data from a log file directly and return the list of
byte buffers. */
- def readDataManually(file: String): Seq[String] = {
- val reader = HdfsUtils.getInputStream(file, hadoopConf)
- val buffer = new ArrayBuffer[String]
- try {
- while (true) {
- // Read till EOF is thrown
- val length = reader.readInt()
- val bytes = new Array[Byte](length)
- reader.read(bytes)
- buffer += Utils.deserialize[String](bytes)
- }
- } catch {
- case ex: EOFException =>
- } finally {
- reader.close()
+ test("BatchedWriteAheadLog - name log with aggregated entries with the
timestamp of last entry") {
+ val wal = new BatchedWriteAheadLog(fileBasedWAL, sparkConf)
+ // block the write so that we can batch some records
+ val promise = writeBlockingPromise(fileBasedWAL)
+
+ val event1 = "hello"
+ val event2 = "world"
+ val event3 = "this"
+ val event4 = "is"
+ val event5 = "doge"
+
+ eventFuture(wal, event1, 3L) // 3 will automatically be flushed for
the first write
+ // rest of the records will be batched while it takes 3 to get written
+ eventFuture(wal, event2, 5L)
+ eventFuture(wal, event3, 8L)
+ eventFuture(wal, event4, 12L)
+ eventFuture(wal, event5, 10L)
+ promise.success(true)
--- End diff --
Are you sure that all the futures have been already blocked before you set
promise to true? I think you need another check to make sure that all the
threads are actually blocked. You could check the number of threads active in
the `walBatchingThreadPool` in an `eventually` for that. And later check them
again to make sure that all the threads have been released.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]