ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1439152043


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    *                  (if there is one). It returns true iff the segment is 
deletable.
    * @return the segments ready to be deleted
    */
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-    def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-      val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   We should change this to return `java.util.Collection[LogSegment]` to avoid 
unnecessary conversions.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         }
         localLog.checkIfMemoryMappedBufferClosed()
         // remove the segments for lookups
-        localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+        localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   This conversion can be avoided if we make the change to the method signature 
suggested above.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1564,8 +1565,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
-    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      nextSegmentOpt.exists(_.baseOffset <= (if (remoteLogEnabled()) 
localLogStartOffset() else logStartOffset))
+    def shouldDelete(segment: LogSegment, nextSegmentOpt: 
Optional[LogSegment]): Boolean = {
+      if (nextSegmentOpt.isPresent)
+        nextSegmentOpt.get().baseOffset <= (if (remoteLogEnabled()) 
localLogStartOffset() else logStartOffset)
+      else false

Review Comment:
   Nit: you can do `nextSegmentOpt.isPresent && nextSegmentOpt.get...`



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   We should change this to return `java.util.Collection` or `java.util.List` 
to avoid unnecessary conversions.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
     def deleteProducerSnapshots(): Unit = {
       LocalLog.maybeHandleIOException(logDirFailureChannel,
         parentDir,
-        s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+        s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
         snapshotsToDelete.foreach { snapshot =>
           snapshot.deleteIfExists()
         }
-      }
+          return;

Review Comment:
   Hmm, it is a bit odd that a `return` with no value is required for scala 
code. Is this right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to