kkonstantine commented on a change in pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#discussion_r417055199
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -281,9 +281,18 @@ private void readToLogEnd() {
Iterator<Map.Entry<TopicPartition, Long>> it =
endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
+ TopicPartition topicPartition = entry.getKey();
+ Long endOffset = entry.getValue();
+ long lastConsumedOffset = consumer.position(topicPartition);
+ if (lastConsumedOffset >= endOffset) {
+ log.trace("Reached end offset {} for {}", endOffset,
topicPartition);
it.remove();
- else {
+ } else {
+ log.trace(
+ "Behind end offset {} for {}; last-consumed offset is
{}",
+ endOffset,
+ topicPartition,
+ lastConsumedOffset);
Review comment:
```suggestion
log.trace("Behind end offset {} for {}; last-consumed
offset is {}",
endOffset, topicPartition, lastConsumedOffset);
```
nit: multiline calls don't need to be on their own line in AK and tab is
equal to 4 spaces (here we need 2 tabs)
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -281,9 +281,18 @@ private void readToLogEnd() {
Iterator<Map.Entry<TopicPartition, Long>> it =
endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
+ TopicPartition topicPartition = entry.getKey();
+ Long endOffset = entry.getValue();
Review comment:
unboxing will happen in the comparison in the `if` branch anyways, so
probably better to do it early declaring the type `long` here.
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -281,9 +281,18 @@ private void readToLogEnd() {
Iterator<Map.Entry<TopicPartition, Long>> it =
endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
+ TopicPartition topicPartition = entry.getKey();
+ Long endOffset = entry.getValue();
+ long lastConsumedOffset = consumer.position(topicPartition);
+ if (lastConsumedOffset >= endOffset) {
+ log.trace("Reached end offset {} for {}", endOffset,
topicPartition);
Review comment:
given that the previous messages say "Reading to ..." maybe it would
make sense to say:
```suggestion
log.trace("Read to end offset {} for {}", endOffset,
topicPartition);
```
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -281,9 +281,18 @@ private void readToLogEnd() {
Iterator<Map.Entry<TopicPartition, Long>> it =
endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
+ TopicPartition topicPartition = entry.getKey();
+ Long endOffset = entry.getValue();
+ long lastConsumedOffset = consumer.position(topicPartition);
+ if (lastConsumedOffset >= endOffset) {
+ log.trace("Reached end offset {} for {}", endOffset,
topicPartition);
it.remove();
- else {
+ } else {
+ log.trace(
+ "Behind end offset {} for {}; last-consumed offset is
{}",
+ endOffset,
+ topicPartition,
+ lastConsumedOffset);
Review comment:
Similar to the above, seeing a message that says `read` might be easier
to read in context than `consumed`.
How about:
`Behind end offset {} for {}; last-read offset is {}`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]