ijuma commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1439152043
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, * (if there is one). It returns true iff the segment is deletable. * @return the segments ready to be deleted */ - private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { - def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], upperBoundOffset: Long): Boolean = { - val allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset + private[log] def deletableSegments(predicate: (LogSegment, Optional[LogSegment]) => Boolean): Iterable[LogSegment] = { Review Comment: We should change this to return `java.util.Collection[LogSegment]` to avoid unnecessary conversions. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } localLog.checkIfMemoryMappedBufferClosed() // remove the segments for lookups - localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) + localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason) Review Comment: This conversion can be avoided if we make the change to the method signature suggested above. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1564,8 +1565,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private def deleteLogStartOffsetBreachedSegments(): Int = { - def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - nextSegmentOpt.exists(_.baseOffset <= (if (remoteLogEnabled()) localLogStartOffset() else logStartOffset)) + def shouldDelete(segment: LogSegment, nextSegmentOpt: Optional[LogSegment]): Boolean = { + if (nextSegmentOpt.isPresent) + nextSegmentOpt.get().baseOffset <= (if (remoteLogEnabled()) localLogStartOffset() else logStartOffset) + else false Review Comment: Nit: you can do `nextSegmentOpt.isPresent && nextSegmentOpt.get...` ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { Review Comment: We should change this to return `java.util.Collection` or `java.util.List` to avoid unnecessary conversions. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging { def deleteProducerSnapshots(): Unit = { LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, - s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir") { + s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir", { snapshotsToDelete.foreach { snapshot => snapshot.deleteIfExists() } - } + return; Review Comment: Hmm, it is a bit odd that a `return` with no value is required for scala code. Is this right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org