dajac commented on code in PR #14870:
URL: https://github.com/apache/kafka/pull/14870#discussion_r1411843255
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -620,8 +621,9 @@ public boolean cleanupExpiredOffsets(String groupId,
List<Record> records) {
});
if (!expiredPartitions.isEmpty()) {
- log.info("[GroupId {}] Expiring offsets of partitions
(allOffsetsExpired={}): {}",
- groupId, allOffsetsExpired, String.join(", ",
expiredPartitions));
+ log.info("[GroupId {}] Expiring {} offsets (allOffsetsExpired={})
in {} milliseconds.",
+ groupId, expiredPartitions.size(), allOffsetsExpired,
time.milliseconds() - startMs);
+ log.debug("[GroupId {}] Expired partitions: {}", groupId,
String.join(", ", expiredPartitions));
Review Comment:
Instead of this one, I wonder if we should just add a debug message for
every expired offset at L613. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -620,8 +621,9 @@ public boolean cleanupExpiredOffsets(String groupId,
List<Record> records) {
});
if (!expiredPartitions.isEmpty()) {
- log.info("[GroupId {}] Expiring offsets of partitions
(allOffsetsExpired={}): {}",
- groupId, allOffsetsExpired, String.join(", ",
expiredPartitions));
+ log.info("[GroupId {}] Expiring {} offsets (allOffsetsExpired={})
in {} milliseconds.",
Review Comment:
I don't really get the value of adding the time here. What's the reasoning
behind it? However, I think that it would make sense to add a log in
`cleanupGroupMetadata` which contains the time taken and the number of records
generated.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java:
##########
@@ -52,6 +52,20 @@ interface TimeoutOperation<T, U> {
*/
void schedule(String key, long delay, TimeUnit unit, boolean retry,
TimeoutOperation<T, U> operation);
+ /**
+ * Add an operation to the timer. If an operation with the same key
+ * already exists, replace it with the new operation.
+ *
+ * @param key The key to identify this operation.
+ * @param delay The delay to wait before expiring.
+ * @param unit The delay unit.
+ * @param retry A boolean indicating whether the operation should
+ * be retried on failure.
+ * @param retryBackoff The delay when rescheduled on retry.
+ * @param operation The operation to perform upon expiration.
+ */
+ void schedule(String key, long delay, TimeUnit unit, boolean retry, long
retryBackoff, TimeoutOperation<T, U> operation);
Review Comment:
Does it rely on the `unit`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -281,6 +281,18 @@ public void schedule(
TimeUnit unit,
boolean retry,
TimeoutOperation<Void, U> operation
+ ) {
+ schedule(key, delay, unit, retry, 500, operation);
+ }
+
+ @Override
+ public void schedule(
+ String key,
+ long delay,
+ TimeUnit unit,
+ boolean retry,
+ long retryBackoff,
Review Comment:
What's the unit of this? Does it rely on `unit`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]