Copilot commented on code in PR #21022:
URL: https://github.com/apache/kafka/pull/21022#discussion_r2576990126
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2548,26 +2311,18 @@ public Set<TopicPartition> partitions() {
"K2".getBytes(),
"V2".getBytes()));
- if (stateUpdaterEnabled) {
- TestUtils.waitForCondition(
- () -> mockRestoreConsumer.assignment().isEmpty(),
- "Never get the assignment");
- } else {
- TestUtils.waitForCondition(
- () -> {
- mockRestoreConsumer.assign(changelogPartitionSet);
- return mockRestoreConsumer.position(changelogPartition) ==
2L;
- },
- "Never finished restore");
- }
+ TestUtils.waitForCondition(
+ () -> mockRestoreConsumer.assignment().isEmpty(),
+ "Never get the assignment");
+
Review Comment:
Trailing whitespace detected. Please remove the trailing whitespace on this
line for consistency with the project's code style.
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1768,15 +1724,14 @@ public void
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
runOnce(processingThreadsEnabled);
// the third actually polls, processes the record, and throws the
corruption exception
- if (stateUpdaterEnabled) {
- TestUtils.waitForCondition(
- () -> thread.taskManager().checkStateUpdater(
- mockTime.milliseconds(),
- topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
- ),
- 10 * 1000,
- "State updater never returned tasks.");
- }
+ TestUtils.waitForCondition(
+ () -> thread.taskManager().checkStateUpdater(
+ mockTime.milliseconds(),
+ topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
+ ),
+ 10 * 1000,
+ "State updater never returned tasks.");
+
Review Comment:
Trailing whitespace detected. Please remove the trailing whitespace on this
line for consistency with the project's code style.
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1449,30 +1416,19 @@ public void shouldNotReturnDataAfterTaskMigrated(final
boolean stateUpdaterEnabl
final TaskMigratedException taskMigratedException = new
TaskMigratedException(
"Changelog restore found task migrated", new
RuntimeException("restore task migrated"));
- ChangelogReader changelogReader = this.changelogReader;
- if (stateUpdaterEnabled) {
- when(taskManager.checkStateUpdater(anyLong(),
any())).thenAnswer(answer -> {
- consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new
byte[0], new byte[0]));
- consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new
byte[1], new byte[0]));
+ final ChangelogReader changelogReader = this.changelogReader;
+ when(taskManager.checkStateUpdater(anyLong(),
any())).thenAnswer(answer -> {
+ consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new
byte[0], new byte[0]));
+ consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new
byte[1], new byte[0]));
- throw taskMigratedException;
- });
- } else {
- changelogReader = new MockChangelogReader() {
- @Override
- public long restore(final Map<TaskId, Task> tasks) {
- consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new
byte[0], new byte[0]));
- consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new
byte[1], new byte[0]));
-
- throw taskMigratedException;
- }
- };
- }
+ throw taskMigratedException;
+ });
Review Comment:
Trailing whitespace detected. Please remove the trailing whitespace on this
line for consistency with the project's code style.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1788,15 +1743,14 @@ public void
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
// Now, we can handle the corruption
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
- if (stateUpdaterEnabled) {
- TestUtils.waitForCondition(
- () -> thread.taskManager().checkStateUpdater(
- mockTime.milliseconds(),
- topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
- ),
- 10 * 1000,
- "State updater never returned tasks.");
- }
+ TestUtils.waitForCondition(
+ () -> thread.taskManager().checkStateUpdater(
+ mockTime.milliseconds(),
+ topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
+ ),
+ 10 * 1000,
+ "State updater never returned tasks.");
+
Review Comment:
Trailing whitespace detected. Please remove the trailing whitespace on this
line for consistency with the project's code style.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]