hachikuji commented on code in PR #15007: URL: https://github.com/apache/kafka/pull/15007#discussion_r1427463801
########## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ########## @@ -645,6 +648,29 @@ public void run() throws Exception { } } + private BufferingBatchConsumer<ApiMessageAndVersion> buildMigrationBatchConsumer( + MigrationManifest.Builder manifestBuilder + ) { + return new BufferingBatchConsumer<>(batch -> { + try { + if (log.isTraceEnabled()) { + batch.forEach(apiMessageAndVersion -> + log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message()))); + } + CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch); Review Comment: Does the consumer here have any expectation on atomicity of the records? I am trying to figure out how the batching applies at the raft layer. Would you expect the batches to be preserved in the log? ########## metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java: ########## @@ -91,24 +101,42 @@ public long durationMs() { return TimeUnit.NANOSECONDS.toMillis(durationNanos); } + public double avgBatchDurationMs() { + if (totalBatches == 0) { + return -1; + } + return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches; + } + + public double avgBatchSize() { + if (totalBatches == 0) { + return -1; + } + return 1.0 * totalBatchSizes / totalBatches; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MigrationManifest that = (MigrationManifest) o; return totalRecords == that.totalRecords && totalBatches == that.totalBatches && + totalBatchDurationsNs == that.totalBatchDurationsNs && + totalBatchSizes == that.totalBatchSizes && durationNanos == that.durationNanos && recordTypeCounts.equals(that.recordTypeCounts); } @Override public int hashCode() { - return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts); + return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, totalBatchSizes, durationNanos, recordTypeCounts); } public String toString() { - return String.format("%d records were generated in %d ms across %d batches. The record types were %s", - totalRecords, durationMs(), totalBatches, recordTypeCounts); + return String.format( + "%d records were generated in %d ms across %d batches. The average batch size was %.2f " + Review Comment: The "average batch size" might be a little ambiguous. Maybe we could say "record/batch" or something like that? Wondering if size in bytes is interesting also, but perhaps we can get that from the raft metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org