Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/4578#discussion_r24628240
--- Diff:
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
---
@@ -83,193 +84,174 @@ class KinesisReceiverSuite extends TestSuiteBase with
Matchers with BeforeAndAft
}
test("process records including store and checkpoint") {
- val expectedCheckpointIntervalMillis = 10
- expecting {
- receiverMock.isStopped().andReturn(false).once()
- receiverMock.store(record1.getData().array()).once()
- receiverMock.store(record2.getData().array()).once()
- checkpointStateMock.shouldCheckpoint().andReturn(true).once()
- checkpointerMock.checkpoint().once()
- checkpointStateMock.advanceCheckpoint().once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).store(record1.getData().array())
+ verify(receiverMock, times(1)).store(record2.getData().array())
+ verify(checkpointerMock, times(1)).checkpoint()
+ verify(checkpointStateMock, times(1)).advanceCheckpoint()
}
test("shouldn't store and checkpoint when receiver is stopped") {
- expecting {
- receiverMock.isStopped().andReturn(true).once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(true)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
}
test("shouldn't checkpoint when exception occurs during store") {
- expecting {
- receiverMock.isStopped().andReturn(false).once()
- receiverMock.store(record1.getData().array()).andThrow(new
RuntimeException()).once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- intercept[RuntimeException] {
- val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.store(record1.getData().array())).thenThrow(new
RuntimeException())
+
+ intercept[RuntimeException] {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
}
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).store(record1.getData().array())
}
test("should set checkpoint time to currentTime + checkpoint interval
upon instantiation") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
+ when(currentClockMock.currentTime()).thenReturn(0)
+
val checkpointIntervalMillis = 10
- val checkpointState = new
KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ val checkpointState =
+ new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis),
currentClockMock)
assert(checkpointState.checkpointClock.currentTime() ==
checkpointIntervalMillis)
- }
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("should checkpoint if we have exceeded the checkpoint interval") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointState = new
KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
- assert(checkpointState.shouldCheckpoint())
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointState = new
KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
+ assert(checkpointState.shouldCheckpoint())
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("shouldn't checkpoint if we have not exceeded the checkpoint
interval") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointState = new
KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
- assert(!checkpointState.shouldCheckpoint())
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointState = new
KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
+ assert(!checkpointState.shouldCheckpoint())
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("should add to time when advancing checkpoint") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointIntervalMillis = 10
- val checkpointState = new
KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() ==
checkpointIntervalMillis)
- checkpointState.advanceCheckpoint()
- assert(checkpointState.checkpointClock.currentTime() == (2 *
checkpointIntervalMillis))
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointIntervalMillis = 10
+ val checkpointState =
+ new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis),
currentClockMock)
+ assert(checkpointState.checkpointClock.currentTime() ==
checkpointIntervalMillis)
+ checkpointState.advanceCheckpoint()
+ assert(checkpointState.checkpointClock.currentTime() == (2 *
checkpointIntervalMillis))
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("shutdown should checkpoint if the reason is TERMINATE") {
- expecting {
- checkpointerMock.checkpoint().once()
- }
- whenExecuting(checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId,
- checkpointStateMock)
- val reason = ShutdownReason.TERMINATE
- recordProcessor.shutdown(checkpointerMock, reason)
- }
+ val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId, checkpointStateMock)
+ val reason = ShutdownReason.TERMINATE
+ recordProcessor.shutdown(checkpointerMock, reason)
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("shutdown should not checkpoint if the reason is something other
than TERMINATE") {
- expecting {
- }
- whenExecuting(checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId,
- checkpointStateMock)
- recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
- recordProcessor.shutdown(checkpointerMock, null)
- }
+ val recordProcessor = new KinesisRecordProcessor(receiverMock,
workerId, checkpointStateMock)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+ recordProcessor.shutdown(checkpointerMock, null)
+
+ verify(checkpointerMock, never()).checkpoint()
--- End diff --
This assertion might not actually be necessary: I guess that EasyMock
throws AssertionError if non-stubbed methods are called on a mock.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]