[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-09-02 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601552#comment-16601552
 ] 

Stephane Maarek commented on KAFKA-7278:


[~lindong] Unfortunately, this does not fix 
https://issues.apache.org/jira/browse/KAFKA-1194
I'll post the details there... 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-31 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598948#comment-16598948
 ] 

Dong Lin commented on KAFKA-7278:
-

[~niob] The stacktrace in that Jira seems similar to the issue fixed here. So 
there is good chance that we have fixed that issue as well.

 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-31 Thread Christoph Schmidt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598540#comment-16598540
 ] 

Christoph Schmidt commented on KAFKA-7278:
--

This ticket has raised questions in the comments of ancient KAFKA-1194 (which 
knows three old PRs) - does it by chance fix that issue, too? Root cause over 
there is that rename-while-still-open blows up under windows, with the only 
available workaround being to completely disable the log cleaner.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587106#comment-16587106
 ] 

ASF GitHub Bot commented on KAFKA-7278:
---

lindong28 closed pull request #5535: Cherry-pick KAFKA-7278; replaceSegments() 
should not call asyncDeleteSegment() for segments which have been removed from 
segments list
URL: https://github.com/apache/kafka/pull/5535
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 8b62918bc97..9b423ba5933 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1608,7 +1608,9 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Perform an asynchronous delete on the given file if it exists (otherwise 
do nothing)
+   * Perform an asynchronous delete on the given file.
+   *
+   * This method assumes that the file exists and the method is not 
thread-safe.
*
* This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because
* it is either called before all logs are loaded or the caller will catch 
and handle IOException
@@ -1655,6 +1657,8 @@ class Log(@volatile var dir: File,
*/
   private[log] def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
 lock synchronized {
+  val existingOldSegments = oldSegments.filter(seg => 
segments.containsKey(seg.baseOffset))
+
   checkIfMemoryMappedBufferClosed()
   // need to do this in two phases to be crash safe AND do the delete 
asynchronously
   // if we crash in the middle of this we complete the swap in 
loadSegments()
@@ -1663,7 +1667,7 @@ class Log(@volatile var dir: File,
   addSegment(newSegment)
 
   // delete the old files
-  for (seg <- oldSegments) {
+  for (seg <- existingOldSegments) {
 // remove the index entry
 if (seg.baseOffset != newSegment.baseOffset)
   segments.remove(seg.baseOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ae949bf6b85..f6001e9f375 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.common._
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
@@ -88,6 +89,74 @@ class LogCleanerTest extends JUnitSuite {
 assertEquals(expectedBytesRead, stats.bytesRead)
   }
 
+  @Test
+  def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = {
+val deleteStartLatch = new CountDownLatch(1)
+val deleteCompleteLatch = new CountDownLatch(1)
+
+// Construct a log instance. The replaceSegments() method of the log 
instance is overridden so that
+// it waits for another thread to execute deleteOldSegments()
+val logProps = new Properties()
+logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + 
LogConfig.Delete)
+val topicPartition = Log.parseTopicPartitionName(dir)
+val producerStateManager = new ProducerStateManager(topicPartition, dir)
+val log = new Log(dir,
+  config = LogConfig.fromProps(logConfig.originals, 
logProps),
+  logStartOffset = 0L,
+  recoveryPoint = 0L,
+  scheduler = time.scheduler,
+  brokerTopicStats = new BrokerTopicStats, time,
+  maxProducerIdExpirationMs = 60 * 60 * 1000,
+  producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+  topicPartition = topicPartition,
+  producerStateManager = producerStateManager,
+  logDirFailureChannel = new LogDirFailureChannel(10)) {
+  override def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
+deleteStartLatch.countDown()
+if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
+  throw new IllegalStateException("Log segment deletion timed out")
+}
+super.replaceSegments(newSegment, oldSegments, isRecoveredSwapFile)
+  }
+}
+
+// Start a thread which execute log.deleteOldSegments() right before 
replaceSegments() is executed
+val t = new Thread() {
+  

[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586362#comment-16586362
 ] 

ASF GitHub Bot commented on KAFKA-7278:
---

lindong28 opened a new pull request #5535: Cherry-pick KAFKA-7278; 
replaceSegments() should not call asyncDeleteSegment() for segments which have 
been removed from segments list
URL: https://github.com/apache/kafka/pull/5535
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.0.1
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586320#comment-16586320
 ] 

ASF GitHub Bot commented on KAFKA-7278:
---

hachikuji closed pull request #5491: KAFKA-7278; replaceSegments() should not 
call asyncDeleteSegment() for segments which have been removed from segments 
list
URL: https://github.com/apache/kafka/pull/5491
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index e4be8fcc43d..afe151d69b6 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1740,7 +1740,9 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Perform an asynchronous delete on the given file if it exists (otherwise 
do nothing)
+   * Perform an asynchronous delete on the given file.
+   *
+   * This method assumes that the file exists and the method is not 
thread-safe.
*
* This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because
* it is either called before all logs are loaded or the caller will catch 
and handle IOException
@@ -1791,10 +1793,13 @@ class Log(@volatile var dir: File,
* @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
*/
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
-val sortedNewSegments = newSegments.sortBy(_.baseOffset)
-val sortedOldSegments = oldSegments.sortBy(_.baseOffset)
-
 lock synchronized {
+  val sortedNewSegments = newSegments.sortBy(_.baseOffset)
+  // Some old segments may have been removed from index and scheduled for 
async deletion after the caller reads segments
+  // but before this method is executed. We want to filter out those 
segments to avoid calling asyncDeleteSegment()
+  // multiple times for the same segment.
+  val sortedOldSegments = oldSegments.filter(seg => 
segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset)
+
   checkIfMemoryMappedBufferClosed()
   // need to do this in two phases to be crash safe AND do the delete 
asynchronously
   // if we crash in the middle of this we complete the swap in 
loadSegments()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b351311b329..0240707ca3b 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -21,6 +21,7 @@ import java.io.{File, RandomAccessFile}
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.common._
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
@@ -89,6 +90,74 @@ class LogCleanerTest extends JUnitSuite {
 assertEquals(expectedBytesRead, stats.bytesRead)
   }
 
+  @Test
+  def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = {
+val deleteStartLatch = new CountDownLatch(1)
+val deleteCompleteLatch = new CountDownLatch(1)
+
+// Construct a log instance. The replaceSegments() method of the log 
instance is overridden so that
+// it waits for another thread to execute deleteOldSegments()
+val logProps = new Properties()
+logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + 
LogConfig.Delete)
+val topicPartition = Log.parseTopicPartitionName(dir)
+val producerStateManager = new ProducerStateManager(topicPartition, dir)
+val log = new Log(dir,
+  config = LogConfig.fromProps(logConfig.originals, 
logProps),
+  logStartOffset = 0L,
+  recoveryPoint = 0L,
+  scheduler = time.scheduler,
+  brokerTopicStats = new BrokerTopicStats, time,
+  maxProducerIdExpirationMs = 60 * 60 * 1000,
+  producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+  topicPartition = topicPartition,
+  producerStateManager = producerStateManager,
+  logDirFailureChannel = new LogDirFailureChannel(10)) {
+  override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
+deleteStartLatch.countDown()
+if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
+   

[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577735#comment-16577735
 ] 

ASF GitHub Bot commented on KAFKA-7278:
---

lindong28 opened a new pull request #5491: KAFKA-7278; replaceSegments() should 
not call asyncDeleteSegment() for segments which have been removed from 
segments list
URL: https://github.com/apache/kafka/pull/5491
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577707#comment-16577707
 ] 

Dong Lin commented on KAFKA-7278:
-

Yes `segment.changeFileSuffixes("", Log.DeletedFileSuffix)` is executed when 
the lock is hold. But the lock is released between step 2), 3) and 4) in the 
example sequence provided above.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577706#comment-16577706
 ] 

Dong Lin commented on KAFKA-7278:
-

[~ijuma] The exception is probably thrown from `segment.changeFileSuffixes("", 
Log.DeletedFileSuffix)`. Below is the stacktrace in the discussion of 
https://issues.apache.org/jira/browse/KAFKA-6188.

{code}

[2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
__consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
(kafka.server.LogDirFailureChannel)
java.nio.file.NoSuchFileException: 
/tmp/kafka-logs/__consumer_offsets-24/.log
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Log.replaceSegments(Log.scala:1648)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
at kafka.log.Cleaner.clean(LogCleaner.scala:438)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Suppressed: java.nio.file.NoSuchFileException: 
/tmp/kafka-logs/__consumer_offsets-24/.log -> 
/tmp/kafka-logs/__consumer_offsets-24/.log.deleted
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
... 16 more
[2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
[2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
(kafka.log.LogManager)
[2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
/tmp/kafka-logs have failed (kafka.log.LogManager)

{code}

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577484#comment-16577484
 ] 

Ismael Juma commented on KAFKA-7278:


Can you please clarify which part of the code is the problem?

{code}
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
  info(s"Deleting segment ${segment.baseOffset}")
  maybeHandleIOException(s"Error while deleting segments for 
$topicPartition in dir ${dir.getParent}") {
segment.deleteIfExists()
  }
}
scheduler.schedule("delete-file", deleteSeg _, delay = 
config.fileDeleteDelayMs)
  }
{code}

`segment.deleteIfExists()` should not throw an exception if the file doesn't 
exist (this is the code that I changed some time ago). There rest executes with 
the lock held. That's why I suspected you were seeing an issue that has since 
been fixed. But I might be missing something.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577335#comment-16577335
 ] 

Dong Lin commented on KAFKA-7278:
-

[~ijuma] Yeah the latest code in trunk seems to have this issue. The following 
sequence of events may happen:

1) There is segment1, segment2 and segment 3 for a given partition

2) LogCleaner determines to merge segment1 and segment2 into segment3 and will 
call Log.replaceSegments(..., oldSegments=[segment1, segment2])

3) Log retention is triggered and Log.deleteSegment(segment=segment1) is called 
and executed. This renames the files for segment1 from log directory.

4) Log.replaceSegments(oldSegments=[segment1, segment2]) is executed and 
Log.asyncDeleteSegment(segment1) is executed, which fails to find files for 
segment1 and throws IOException.

 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-11 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577280#comment-16577280
 ] 

Ismael Juma commented on KAFKA-7278:


Is this the case in trunk? I remember we fixed one issue like what you 
describe, but there could be more.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)