jiafu1115 commented on code in PR #20913:
URL: https://github.com/apache/kafka/pull/20913#discussion_r3259505787
##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:
##########
@@ -937,6 +946,68 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog
log, Long fromOffset, L
return candidateLogSegments;
}
+ private boolean delayCopy(LogConfig logConfig, LogSegment previousSeg,
long currentTimeMs, long totalLogSize, long cumulativeSize) {
+ if (logConfig == null) {
+ return false;
+ }
+
+ long copyLagMs = logConfig.remoteCopyLagMs();
+ long copyLagBytes = logConfig.remoteCopyLagBytes();
+ if (logger.isTraceEnabled()) {
+ logger.trace("delayCopy check for segment {}: copyLagMs={},
copyLagBytes={}, currentTimeMs={}, totalLogSize={}, cumulativeSize={},
sizeLagBytes={}",
+ previousSeg, copyLagMs, copyLagBytes, currentTimeMs,
totalLogSize, cumulativeSize, totalLogSize - cumulativeSize);
+ }
+
+ if (copyLagMs == 0 || copyLagBytes == 0) {
+ return false;
+ }
+
+ boolean needCheckCopyLagMs = copyLagMs > 0;
+ boolean needCheckCopyLagBytes = copyLagBytes > 0;
+
+ // When no lag delay is enabled, upload immediately.
+ if (!needCheckCopyLagMs && !needCheckCopyLagBytes) {
Review Comment:
@kamalcph
thanks for your review. I already split the test to new dedicated test class
and refactor the code according to part of suggestions. All tests passed except
flaky tests.
for this question. we can discuss it here:
(1) about both -1
In your code approach, if the effective localRetentionMs and
localRetentionSize both are -1
then effectiveCopyLagBytes and effectiveCopyLagMs both are Long.MAX_VALUE
then the upload won't be done.
it mean if one topic which want to save forever in local. it won't be upload
any segment. right?
in my code approach. it will be upload at once.
(2) if I can move your followed code into logConfig.remoteCopyLagBytes()()'
inner implement instead of here.
```
long effectiveCopyLagBytes = logConfig.remoteCopyLagBytes();
// derive the value from local retention size when copyLagBytes configured
to -1
if (effectiveCopyLagBytes == -1L) {
effectiveCopyLagBytes = logConfig.localRetentionBytes();
// if the local retention size is configured to infinite, then
configure copyLagBytes as infinite
if (effectiveCopyLagBytes == -1L) {
effectiveCopyLagBytes = Long.MAX_VALUE;
}
}
```
After these two questions discussed. I can go ahead for improvement.
Thanks a lot!
##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:
##########
@@ -937,6 +946,68 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog
log, Long fromOffset, L
return candidateLogSegments;
}
+ private boolean delayCopy(LogConfig logConfig, LogSegment previousSeg,
long currentTimeMs, long totalLogSize, long cumulativeSize) {
+ if (logConfig == null) {
+ return false;
+ }
+
+ long copyLagMs = logConfig.remoteCopyLagMs();
+ long copyLagBytes = logConfig.remoteCopyLagBytes();
+ if (logger.isTraceEnabled()) {
+ logger.trace("delayCopy check for segment {}: copyLagMs={},
copyLagBytes={}, currentTimeMs={}, totalLogSize={}, cumulativeSize={},
sizeLagBytes={}",
+ previousSeg, copyLagMs, copyLagBytes, currentTimeMs,
totalLogSize, cumulativeSize, totalLogSize - cumulativeSize);
+ }
+
+ if (copyLagMs == 0 || copyLagBytes == 0) {
+ return false;
+ }
+
+ boolean needCheckCopyLagMs = copyLagMs > 0;
+ boolean needCheckCopyLagBytes = copyLagBytes > 0;
+
+ // When no lag delay is enabled, upload immediately.
+ if (!needCheckCopyLagMs && !needCheckCopyLagBytes) {
Review Comment:
@kamalcph
thanks for your review. I already split the test to new dedicated test class
and refactor the code according to part of suggestions. All tests passed except
flaky tests.
for this question. we can discuss it here:
(1) about both -1
In your code approach, if the effective localRetentionMs and
localRetentionSize both are -1
then effectiveCopyLagBytes and effectiveCopyLagMs both are Long.MAX_VALUE
then the upload won't be done.
it mean if one topic which want to save forever in local. it won't be upload
any segment. right?
in my code approach. it will be upload at once.
(2) if I can move your followed code into logConfig.remoteCopyLagBytes()'
inner implement instead of here.
```
long effectiveCopyLagBytes = logConfig.remoteCopyLagBytes();
// derive the value from local retention size when copyLagBytes configured
to -1
if (effectiveCopyLagBytes == -1L) {
effectiveCopyLagBytes = logConfig.localRetentionBytes();
// if the local retention size is configured to infinite, then
configure copyLagBytes as infinite
if (effectiveCopyLagBytes == -1L) {
effectiveCopyLagBytes = Long.MAX_VALUE;
}
}
```
After these two questions discussed. I can go ahead for improvement.
Thanks a lot!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]