Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9143#discussion_r44183084
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
    @@ -190,283 +281,187 @@ class WriteAheadLogSuite extends SparkFunSuite with 
BeforeAndAfter {
         }
         reader.close()
       }
    +}
     
    -  test("FileBasedWriteAheadLog - write rotating logs") {
    -    // Write data with rotation using WriteAheadLog class
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    +abstract class CloseFileAfterWriteTests(allowBatching: Boolean, testTag: 
String)
    +  extends CommonWriteAheadLogTests(allowBatching, closeFileAfterWrite = 
true, testTag) {
     
    -    // Read data manually to verify the written data
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    -    assert(writtenData === dataToWrite)
    -  }
    -
    -  test("FileBasedWriteAheadLog - close after write flag") {
    +  import WriteAheadLogSuite._
    +  test(testPrefix + "close after write flag") {
         // Write data with rotation using WriteAheadLog class
         val numFiles = 3
         val dataToWrite = Seq.tabulate(numFiles)(_.toString)
         // total advance time is less than 1000, therefore log shouldn't be 
rolled, but manually closed
         writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, 
clockAdvanceTime = 100,
    -      closeFileAfterWrite = true)
    +      closeFileAfterWrite = true, allowBatching = allowBatching)
     
         // Read data manually to verify the written data
         val logFiles = getLogFilesInDirectory(testDir)
         assert(logFiles.size === numFiles)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    +    val writtenData: Seq[String] = 
readAndDeserializeDataManually(logFiles, allowBatching)
         assert(writtenData === dataToWrite)
       }
    +}
     
    -  test("FileBasedWriteAheadLog - read rotating logs") {
    -    // Write data manually for testing reading through WriteAheadLog
    -    val writtenData = (1 to 10).map { i =>
    -      val data = generateRandomData()
    -      val file = testDir + s"/log-$i-$i"
    -      writeDataManually(data, file)
    -      data
    -    }.flatten
    -
    -    val logDirectoryPath = new Path(testDir)
    -    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, 
hadoopConf)
    -    assert(fileSystem.exists(logDirectoryPath) === true)
    +class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
    +  extends CloseFileAfterWriteTests(allowBatching = false, 
"FileBasedWriteAheadLog")
     
    -    // Read data using manager and verify
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === writtenData)
    -  }
    +class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
    +    allowBatching = true,
    +    closeFileAfterWrite = false,
    +    "BatchedWriteAheadLog") with MockitoSugar with BeforeAndAfterEach with 
PrivateMethodTester {
     
    -  test("FileBasedWriteAheadLog - recover past logs when creating new 
manager") {
    -    // Write data with manager, recover with new manager and verify
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(dataToWrite === readData)
    -  }
    +  import BatchedWriteAheadLog._
    +  import WriteAheadLogSuite._
     
    -  test("FileBasedWriteAheadLog - clean old logs") {
    -    logCleanUpTest(waitForCompletion = false)
    -  }
    +  private var fileBasedWAL: FileBasedWriteAheadLog = _
    +  private var walHandle: FileBasedWriteAheadLogSegment = _
    +  private var walBatchingThreadPool: ExecutionContextExecutorService = _
     
    -  test("FileBasedWriteAheadLog - clean old logs synchronously") {
    -    logCleanUpTest(waitForCompletion = true)
    +  override def beforeEach(): Unit = {
    +    fileBasedWAL = mock[FileBasedWriteAheadLog]
    +    walHandle = mock[FileBasedWriteAheadLogSegment]
    +    walBatchingThreadPool = ExecutionContext.fromExecutorService(
    +      ThreadUtils.newDaemonFixedThreadPool(8, "wal-test-thread-pool"))
       }
     
    -  private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
    -    // Write data with manager, recover with new manager and verify
    -    val manualClock = new ManualClock
    -    val dataToWrite = generateRandomData()
    -    writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite, 
manualClock, closeLog = false)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -
    -    writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion)
    -
    -    if (waitForCompletion) {
    -      assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    -    } else {
    -      eventually(timeout(1 second), interval(10 milliseconds)) {
    -        assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    -      }
    +  override def afterEach(): Unit = {
    +    if (walBatchingThreadPool != null) {
    +      walBatchingThreadPool.shutdownNow()
         }
       }
     
    -  test("FileBasedWriteAheadLog - handling file errors while reading 
rotating logs") {
    -    // Generate a set of log files
    -    val manualClock = new ManualClock
    -    val dataToWrite1 = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock)
    -    val logFiles1 = getLogFilesInDirectory(testDir)
    -    assert(logFiles1.size > 1)
    -
    -
    -    // Recover old files and generate a second set of log files
    -    val dataToWrite2 = generateRandomData()
    -    manualClock.advance(100000)
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock)
    -    val logFiles2 = getLogFilesInDirectory(testDir)
    -    assert(logFiles2.size > logFiles1.size)
    -
    -    // Read the files and verify that all the written data can be read
    -    val readData1 = readDataUsingWriteAheadLog(testDir)
    -    assert(readData1 === (dataToWrite1 ++ dataToWrite2))
    -
    -    // Corrupt the first set of files so that they are basically unreadable
    -    logFiles1.foreach { f =>
    -      val raf = new FileOutputStream(f, true).getChannel()
    -      raf.truncate(1)
    -      raf.close()
    -    }
    +  test("BatchedWriteAheadLog - serializing and deserializing batched 
records") {
    +    val events = Seq(
    +      BlockAdditionEvent(ReceivedBlockInfo(0, None, None, null)),
    +      BatchAllocationEvent(null, null),
    +      BatchCleanupEvent(Nil)
    +    )
     
    -    // Verify that the corrupted files do not prevent reading of the 
second set of data
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === dataToWrite2)
    -  }
    -
    -  test("FileBasedWriteAheadLog - do not create directories or files unless 
write") {
    -    val nonexistentTempPath = File.createTempFile("test", "")
    -    nonexistentTempPath.delete()
    -    assert(!nonexistentTempPath.exists())
    +    val buffers = events.map(e => 
Record(ByteBuffer.wrap(Utils.serialize(e)), 0L, null))
    +    val batched = BatchedWriteAheadLog.aggregate(buffers)
    +    val deaggregate = BatchedWriteAheadLog.deaggregate(batched).map(buffer 
=>
    +      Utils.deserialize[ReceivedBlockTrackerLogEvent](buffer.array()))
     
    -    val writtenSegment = writeDataManually(generateRandomData(), testFile)
    -    val wal = new FileBasedWriteAheadLog(new SparkConf(), 
tempDir.getAbsolutePath,
    -      new Configuration(), 1, 1, closeFileAfterWrite = false)
    -    assert(!nonexistentTempPath.exists(), "Directory created just by 
creating log object")
    -    wal.read(writtenSegment.head)
    -    assert(!nonexistentTempPath.exists(), "Directory created just by 
attempting to read segment")
    +    assert(deaggregate.toSeq === events)
       }
    -}
     
    -object WriteAheadLogSuite {
    +  test("BatchedWriteAheadLog - failures in wrappedLog get bubbled up") {
    +    when(fileBasedWAL.write(any[ByteBuffer], anyLong)).thenThrow(new 
RuntimeException("Hello!"))
    +    // the BatchedWriteAheadLog should bubble up any exceptions that may 
have happened during writes
    +    val wal = new BatchedWriteAheadLog(fileBasedWAL)
     
    -  class MockWriteAheadLog0() extends WriteAheadLog {
    -    override def write(record: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = { null }
    -    override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = { 
null }
    -    override def readAll(): util.Iterator[ByteBuffer] = { null }
    -    override def clean(threshTime: Long, waitForCompletion: Boolean): Unit 
= { }
    -    override def close(): Unit = { }
    +    intercept[RuntimeException] {
    +      val buffer = mock[ByteBuffer]
    +      wal.write(buffer, 2L)
    +    }
       }
     
    -  class MockWriteAheadLog1(val conf: SparkConf) extends 
MockWriteAheadLog0()
    -
    -  class MockWriteAheadLog2(val conf: SparkConf, x: Int) extends 
MockWriteAheadLog0()
    -
    -
    -  private val hadoopConf = new Configuration()
    -
    -  /** Write data to a file directly and return an array of the file 
segments written. */
    -  def writeDataManually(data: Seq[String], file: String): 
Seq[FileBasedWriteAheadLogSegment] = {
    -    val segments = new ArrayBuffer[FileBasedWriteAheadLogSegment]()
    -    val writer = HdfsUtils.getOutputStream(file, hadoopConf)
    -    data.foreach { item =>
    -      val offset = writer.getPos
    -      val bytes = Utils.serialize(item)
    -      writer.writeInt(bytes.size)
    -      writer.write(bytes)
    -      segments += FileBasedWriteAheadLogSegment(file, offset, bytes.size)
    -    }
    -    writer.close()
    -    segments
    +  // we make the write requests in separate threads so that we don't block 
the test thread
    +  private def eventFuture(
    +      wal: WriteAheadLog,
    +      event: String,
    +      time: Long,
    +      numSuccess: AtomicInteger = null,
    +      numFail: AtomicInteger = null): Unit = {
    +    val f = Future(wal.write(event, time))(walBatchingThreadPool)
    +    f.onComplete {
    +      case Success(v) =>
    +        assert(v === walHandle) // return our mock handle after the write
    +        if (numSuccess != null) numSuccess.incrementAndGet()
    +      case Failure(v) => if (numFail != null) numFail.incrementAndGet()
    +    }(walBatchingThreadPool)
       }
     
       /**
    -   * Write data to a file using the writer class and return an array of 
the file segments written.
    +   * In order to block the writes on the writer thread, we mock the write 
method, and block it
    +   * for some time with a promise.
        */
    -  def writeDataUsingWriter(
    -      filePath: String,
    -      data: Seq[String]
    -    ): Seq[FileBasedWriteAheadLogSegment] = {
    -    val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
    -    val segments = data.map {
    -      item => writer.write(item)
    -    }
    -    writer.close()
    -    segments
    +  private def writeBlockingPromise(wal: WriteAheadLog): Promise[Any] = {
    +    // we would like to block the write so that we can queue requests
    +    val promise = Promise[Any]()
    +    when(wal.write(any[ByteBuffer], any[Long])).thenAnswer(
    +      new Answer[FileBasedWriteAheadLogSegment] {
    +        override def answer(invocation: InvocationOnMock): 
FileBasedWriteAheadLogSegment = {
    +          Await.ready(promise.future, 4.seconds)
    +          walHandle
    +        }
    +      }
    +    )
    +    promise
       }
     
    -  /** Write data to rotating files in log directory using the 
WriteAheadLog class. */
    -  def writeDataUsingWriteAheadLog(
    -      logDirectory: String,
    -      data: Seq[String],
    -      manualClock: ManualClock = new ManualClock,
    -      closeLog: Boolean = true,
    -      clockAdvanceTime: Int = 500,
    -      closeFileAfterWrite: Boolean = false): FileBasedWriteAheadLog = {
    -    if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
    -    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1,
    -      closeFileAfterWrite)
    -
    -    // Ensure that 500 does not get sorted after 2000, so put a high base 
value.
    -    data.foreach { item =>
    -      manualClock.advance(clockAdvanceTime)
    -      wal.write(item, manualClock.getTimeMillis())
    +  test("BatchedWriteAheadLog - records get added to a queue") {
    +    val numSuccess = new AtomicInteger()
    +    val numFail = new AtomicInteger()
    +    val wal = new BatchedWriteAheadLog(fileBasedWAL)
    +
    +    val promise = writeBlockingPromise(fileBasedWAL)
    +
    +    // make sure queue is empty initially
    +    assert(wal.getQueueLength === 0)
    +    val event1 = "hello"
    +    val event2 = "world"
    +    val event3 = "this"
    +    val event4 = "is"
    +    val event5 = "doge"
    +
    +    eventFuture(wal, event1, 5L, numSuccess, numFail)
    +    eventFuture(wal, event2, 10L, numSuccess, numFail)
    +    eventFuture(wal, event3, 11L, numSuccess, numFail)
    +    eventFuture(wal, event4, 12L, numSuccess, numFail)
    +    eventFuture(wal, event5, 20L, numSuccess, numFail)
    +
    +    eventually(Eventually.timeout(2 seconds)) {
    +      // the first element will immediately be taken and the rest will get 
queued
    --- End diff --
    
    This is not guaranteed. `BatchedWriteAheadLog.flushRecords` just drains all 
records in the queue, and the queue length could be anything. You may not be 
able to observe 4.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to