tombentley commented on a change in pull request #11450: URL: https://github.com/apache/kafka/pull/11450#discussion_r758385491
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java ########## @@ -185,82 +181,64 @@ public void deleteTopicStatus() { public void putTopicState() { TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds()); String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR; - Capture<byte[]> valueCapture = newCapture(); - Capture<Callback> callbackCapture = newCapture(); - kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture)); - expectLastCall() - .andAnswer(() -> { - callbackCapture.getValue().onCompletion(null, null); - return null; - }); - replayAll(); + ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class); + doAnswer(invocation -> { + ((Callback) invocation.getArgument(2)).onCompletion(null, null); + return null; + }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class)); store.put(topicStatus); // check capture state - assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue())); + assertEquals(topicStatus, store.parseTopicStatus(valueCaptor.getValue())); // state is not visible until read back from the log assertNull(store.getTopic(FOO_CONNECTOR, FOO_TOPIC)); - ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, valueCapture.getValue()); + ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, valueCaptor.getValue()); store.read(statusRecord); assertEquals(topicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC)); - assertEquals(new HashSet<>(Collections.singletonList(topicStatus)), new HashSet<>(store.getAllTopics(FOO_CONNECTOR))); - - verifyAll(); + assertEquals(Collections.singleton(topicStatus), new HashSet<>(store.getAllTopics(FOO_CONNECTOR))); } @Test public void putTopicStateRetriableFailure() { TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds()); String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR; - Capture<byte[]> valueCapture = newCapture(); - Capture<Callback> callbackCapture = newCapture(); - kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture)); - expectLastCall() - .andAnswer(() -> { - callbackCapture.getValue().onCompletion(null, new TimeoutException()); - return null; - }) - .andAnswer(() -> { - callbackCapture.getValue().onCompletion(null, null); - return null; - }); - - replayAll(); + + ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class); + doAnswer(invocation -> { + ((Callback) invocation.getArgument(2)).onCompletion(null, new TimeoutException()); + return null; + }).doAnswer(invocation -> { + ((Callback) invocation.getArgument(2)).onCompletion(null, null); + return null; + }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class)); + store.put(topicStatus); // check capture state - assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue())); + assertEquals(topicStatus, store.parseTopicStatus(valueCaptor.getValue())); // state is not visible until read back from the log assertNull(store.getTopic(FOO_CONNECTOR, FOO_TOPIC)); - - verifyAll(); Review comment: I think the assertion on 219 would pass even if the 1st mocked interaction never happened. Do we need something to tighten up the expected behaviour? Maybe something like: ```java verify(kafkaBasedLog, times(2)).send(any(), any(), any()); ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java ########## @@ -144,74 +126,60 @@ public void testFlushFailureReplacesOffsets() throws Exception { // such that a subsequent flush will write them. @SuppressWarnings("unchecked") - final Callback<Void> callback = PowerMock.createMock(Callback.class); + final Callback<Void> callback = mock(Callback.class); // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, true, null); - // Second time it succeeds - expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); - // Third time it has no data to flush so we won't get past beginFlush() - PowerMock.replayAll(); + // Third time it has no data to flush so we won't get past beginFlush() Review comment: Now that the `// Second time it succeeds` comment has been removed it's a little be weird to just have comments about the first and third time followed by one about `// Second time it succeeds`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org