gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1638488900
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
}
@Test
- public void testAddAssignmentsForPartitions() throws InterruptedException {
+ public void testAddAssignmentsForPartitions() {
final List<TopicIdPartition> idPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
- thread.start();
+ consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : idPartitions) {
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " +
idPartition + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
assertTrue(handler.isPartitionLoaded.get(idPartition));
}
}
@Test
- public void testRemoveAssignmentsForPartitions() throws
InterruptedException {
+ public void testRemoveAssignmentsForPartitions() {
final List<TopicIdPartition> allPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
- thread.start();
+ consumerTask.ingestRecords();
final TopicIdPartition tpId = allPartitions.get(0);
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + "
to be assigned");
addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
- "Couldn't read record");
+ consumerTask.ingestRecords();
+ assertTrue(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
Review Comment:
Hey this wasn't addressed. Can you remove the () -> where it it isn't
necessary?
##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -115,40 +118,54 @@ public ConsumerTask(RemotePartitionMetadataEventHandler
remotePartitionMetadataE
this.pollTimeoutMs = pollTimeoutMs;
this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
this.time = Objects.requireNonNull(time);
+ this.isInternalConsumerClosed = new AtomicBoolean(false);
this.uninitializedAt = time.milliseconds();
}
@Override
public void run() {
log.info("Starting consumer task thread.");
while (!isClosed) {
- try {
- if (hasAssignmentChanged) {
- maybeWaitForPartitionAssignments();
- }
+ ingestRecords();
+ }
+ closeConsumer();
+ log.info("Exited from consumer task thread");
+ }
- log.trace("Polling consumer to receive remote log metadata
topic records");
- final ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeoutMs));
- for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
- processConsumerRecord(record);
- }
- maybeMarkUserPartitionsAsReady();
- } catch (final WakeupException ex) {
- // ignore logging the error
- isClosed = true;
- } catch (final RetriableException ex) {
- log.warn("Retriable error occurred while processing the
records. Retrying...", ex);
- } catch (final Exception ex) {
- isClosed = true;
- log.error("Error occurred while processing the records", ex);
+ // public for testing
+ public void ingestRecords() {
+ try {
+ if (hasAssignmentChanged) {
+ maybeWaitForPartitionAssignments();
}
+
+ log.trace("Polling consumer to receive remote log metadata topic
records");
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+ for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+ processConsumerRecord(record);
+ }
+ maybeMarkUserPartitionsAsReady();
+ } catch (final WakeupException ex) {
+ // ignore logging the error
+ isClosed = true;
+ closeConsumer();
+ } catch (final RetriableException ex) {
+ log.warn("Retriable error occurred while processing the records.
Retrying...", ex);
+ } catch (final Exception ex) {
+ isClosed = true;
+ log.error("Error occurred while processing the records", ex);
+ closeConsumer();
}
- try {
- consumer.close(Duration.ofSeconds(30));
- } catch (final Exception e) {
- log.error("Error encountered while closing the consumer", e);
+ }
+
+ private void closeConsumer() {
+ if (isInternalConsumerClosed.compareAndSet(false, true)) {
Review Comment:
While the AtomicBoolean makes this implementation correct, it seems more
complicated than the existing implementation.
I think if you made closeConsumer more visible and called it in the
ConsumerTaskTest, we could avoid needing to change this logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]