[jira] [Commented] (KAFKA-1908) Split brain
[ https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15800988#comment-15800988 ] Alexey Ozeritskiy commented on KAFKA-1908: -- Now I think this is not a problem. In the above scenario we get isolated leader. We cannot write data via that leader (in case of appropriate min.insync.replicas), non-local processes cannot read data from that leader, local processes (mirror maker and so on) will do nothing because they cannot read actual data. I think the ticket should be closed. > Split brain > --- > > Key: KAFKA-1908 > URL: https://issues.apache.org/jira/browse/KAFKA-1908 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.0 >Reporter: Alexey Ozeritskiy > > In some cases, there may be two leaders for one partition. > Steps to reproduce: > # We have 3 brokers, 1 partition with 3 replicas: > {code} > TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] > ISR: [1,2,3] > {code} > # controller works on broker 3 > # let the kafka port be 9092. We execute on broker 1: > {code} > iptables -A INPUT -p tcp --dport 9092 -j REJECT > {code} > # Initiate replica election > # As a result: > Broker 1: > {code} > TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] > ISR: [1,2,3] > {code} > Broker 2: > {code} > TopicAndPartition: [partition,0]Leader: 2 Replicas: [2,1,3] > ISR: [1,2,3] > {code} > # Flush the iptables rules on broker 1 > Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not > receive new data. A consumer can read data from replica-1 or replica-2. When > it reads from replica-1 it resets the offsets and than can read duplicates > from replica-2. > We saw this situation in our production cluster when it had network problems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1923) Negative offsets in replication-offset-checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772787#comment-15772787 ] Alexey Ozeritskiy commented on KAFKA-1923: -- Problem stil exists (v0.10.1.1). Trying to reproduce it. I've read the code and found that the only place with negative offsets is Replica.logEndOffset (for remote Replica), but I dont understand how it can get into highwatermark. > Negative offsets in replication-offset-checkpoint file > -- > > Key: KAFKA-1923 > URL: https://issues.apache.org/jira/browse/KAFKA-1923 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.0 >Reporter: Oleg Golovin > > Today was the second time we witnessed negative offsets in > replication-offset-checkpoint file. After restart the node stops replicating > some of its partitions. > Unfortunately we can't reproduce it yet. But the two cases we encountered > indicate a bug which should be addressed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4205) NullPointerException in fetchOffsetsBefore
[ https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-4205: - Status: Patch Available (was: Open) > NullPointerException in fetchOffsetsBefore > -- > > Key: KAFKA-4205 > URL: https://issues.apache.org/jira/browse/KAFKA-4205 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.1 >Reporter: Andrew Grasso >Assignee: Anton Karamanov > Labels: reliability > Fix For: 0.10.1.1 > > > We recently observed the following error in brokers running 0.9.0.1: > A client saw an Unkown error code in response to an offset request for > TOPICX, partition 0 > The server logs look like: > {code} > [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log > TOPICX-0 for deletion. (kafka.log.Log) > [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to > offset request (kafka.server.KafkaApis) > java.lang.NullPointerException > at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513) > at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501) > at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461) > at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452) > at kafka.server.KafkaApis.handle(KafkaApis.scala:70) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. > (kafka.log.Log) > [2016-09-21 21:27:07,263] INFO Deleting index > /path/to/kafka/data/TOPICX-0/000527235760.index.deleted > (kafka.log.OffsetIndex) > {code} > I suspect a race condition between {{Log.deleteSegment}} (which takes a lock > on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any > lock. In particular, line 513 in KafkaApis looks like: > {code:title=KafkaApis.scala|borderStyle=solid} > 510 private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: > Int): Seq[Long] = { > 511val segsArray = log.logSegments.toArray > 512var offsetTimeArray: Array[(Long, Long)] = null > 513val lastSegmentHasSize = segsArray.last.size > 0; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4205) NullPointerException in fetchOffsetsBefore
[ https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-4205: - Assignee: Anton Karamanov > NullPointerException in fetchOffsetsBefore > -- > > Key: KAFKA-4205 > URL: https://issues.apache.org/jira/browse/KAFKA-4205 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.1 >Reporter: Andrew Grasso >Assignee: Anton Karamanov > Labels: reliability > Fix For: 0.10.1.1 > > > We recently observed the following error in brokers running 0.9.0.1: > A client saw an Unkown error code in response to an offset request for > TOPICX, partition 0 > The server logs look like: > {code} > [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log > TOPICX-0 for deletion. (kafka.log.Log) > [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to > offset request (kafka.server.KafkaApis) > java.lang.NullPointerException > at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513) > at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501) > at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461) > at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452) > at kafka.server.KafkaApis.handle(KafkaApis.scala:70) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. > (kafka.log.Log) > [2016-09-21 21:27:07,263] INFO Deleting index > /path/to/kafka/data/TOPICX-0/000527235760.index.deleted > (kafka.log.OffsetIndex) > {code} > I suspect a race condition between {{Log.deleteSegment}} (which takes a lock > on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any > lock. In particular, line 513 in KafkaApis looks like: > {code:title=KafkaApis.scala|borderStyle=solid} > 510 private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: > Int): Seq[Long] = { > 511val segsArray = log.logSegments.toArray > 512var offsetTimeArray: Array[(Long, Long)] = null > 513val lastSegmentHasSize = segsArray.last.size > 0; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4205) NullPointerException in fetchOffsetsBefore
[ https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15715179#comment-15715179 ] Alexey Ozeritskiy commented on KAFKA-4205: -- We see this on 0.10.1. ~20-30 errors per day on every broker > NullPointerException in fetchOffsetsBefore > -- > > Key: KAFKA-4205 > URL: https://issues.apache.org/jira/browse/KAFKA-4205 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.1 >Reporter: Andrew Grasso > > We recently observed the following error in brokers running 0.9.0.1: > A client saw an Unkown error code in response to an offset request for > TOPICX, partition 0 > The server logs look like: > {code} > [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log > TOPICX-0 for deletion. (kafka.log.Log) > [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to > offset request (kafka.server.KafkaApis) > java.lang.NullPointerException > at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513) > at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501) > at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461) > at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452) > at kafka.server.KafkaApis.handle(KafkaApis.scala:70) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. > (kafka.log.Log) > [2016-09-21 21:27:07,263] INFO Deleting index > /path/to/kafka/data/TOPICX-0/000527235760.index.deleted > (kafka.log.OffsetIndex) > {code} > I suspect a race condition between {{Log.deleteSegment}} (which takes a lock > on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any > lock. In particular, line 513 in KafkaApis looks like: > {code:title=KafkaApis.scala|borderStyle=solid} > 510 private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: > Int): Seq[Long] = { > 511val segsArray = log.logSegments.toArray > 512var offsetTimeArray: Array[(Long, Long)] = null > 513val lastSegmentHasSize = segsArray.last.size > 0; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit
[ https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15658044#comment-15658044 ] Alexey Ozeritskiy commented on KAFKA-4399: -- As for now I think you can use the following config as a workaround {code} # 100 days offsets.retention.minutes=144000 # 1 hour offsets.retention.check.interval.ms=360 {code} > Deadlock between cleanupGroupMetadata and offset commit > --- > > Key: KAFKA-4399 > URL: https://issues.apache.org/jira/browse/KAFKA-4399 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Alexey Ozeritskiy >Priority: Blocker > Attachments: deadlock-stack > > > We have upgraded our clusters to 0.10.1.0 and got deadlock issue. > We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but > patch did not help us and our stacks is different. I think it is other issue. > Stack traces attached -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit
[ https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-4399: - Status: Patch Available (was: Open) > Deadlock between cleanupGroupMetadata and offset commit > --- > > Key: KAFKA-4399 > URL: https://issues.apache.org/jira/browse/KAFKA-4399 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Alexey Ozeritskiy >Priority: Blocker > Attachments: deadlock-stack > > > We have upgraded our clusters to 0.10.1.0 and got deadlock issue. > We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but > patch did not help us and our stacks is different. I think it is other issue. > Stack traces attached -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit
[ https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15658027#comment-15658027 ] Alexey Ozeritskiy commented on KAFKA-4399: -- The simplest way to fix it is to move complicated code (i.e. .appendMessagesToLeader) outside the lock. See PR > Deadlock between cleanupGroupMetadata and offset commit > --- > > Key: KAFKA-4399 > URL: https://issues.apache.org/jira/browse/KAFKA-4399 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Alexey Ozeritskiy >Priority: Blocker > Attachments: deadlock-stack > > > We have upgraded our clusters to 0.10.1.0 and got deadlock issue. > We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but > patch did not help us and our stacks is different. I think it is other issue. > Stack traces attached -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit
[ https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-4399: - Attachment: deadlock-stack > Deadlock between cleanupGroupMetadata and offset commit > --- > > Key: KAFKA-4399 > URL: https://issues.apache.org/jira/browse/KAFKA-4399 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Alexey Ozeritskiy >Priority: Blocker > Attachments: deadlock-stack > > > We have upgraded our clusters to 0.10.1.0 and got deadlock issue. > We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but > patch did not help us and our stacks is different. I think it is other issue. > Stack traces attached -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit
Alexey Ozeritskiy created KAFKA-4399: Summary: Deadlock between cleanupGroupMetadata and offset commit Key: KAFKA-4399 URL: https://issues.apache.org/jira/browse/KAFKA-4399 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.0 Reporter: Alexey Ozeritskiy Priority: Blocker We have upgraded our clusters to 0.10.1.0 and got deadlock issue. We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but patch did not help us and our stacks is different. I think it is other issue. Stack traces attached -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4319) AbstractFetcherManager shutdown speedup
[ https://issues.apache.org/jira/browse/KAFKA-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-4319: - Status: Patch Available (was: Open) https://github.com/apache/kafka/pull/2023 > AbstractFetcherManager shutdown speedup > --- > > Key: KAFKA-4319 > URL: https://issues.apache.org/jira/browse/KAFKA-4319 > Project: Kafka > Issue Type: Improvement >Reporter: Alexey Ozeritskiy > > While a shutdown proccess, AbstractFetcherManager closed all worker-threads > sequentially which slows the final time of shutdown dramatically on huge > clusters (approximately 15 minutes for 100 nodes, for example). > This can be improved by parallel mode. On the first stage > AbstractFetcherManager can send the stop signal and then join all the workers > to the thread -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4319) AbstractFetcherManager shutdown speedup
Alexey Ozeritskiy created KAFKA-4319: Summary: AbstractFetcherManager shutdown speedup Key: KAFKA-4319 URL: https://issues.apache.org/jira/browse/KAFKA-4319 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy While a shutdown proccess, AbstractFetcherManager closed all worker-threads sequentially which slows the final time of shutdown dramatically on huge clusters (approximately 15 minutes for 100 nodes, for example). This can be improved by parallel mode. On the first stage AbstractFetcherManager can send the stop signal and then join all the workers to the thread -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2063) Bound fetch response size (KIP-74)
[ https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15487885#comment-15487885 ] Alexey Ozeritskiy commented on KAFKA-2063: -- Thanks Andrey, I agree with you. > Bound fetch response size (KIP-74) > -- > > Key: KAFKA-2063 > URL: https://issues.apache.org/jira/browse/KAFKA-2063 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps >Assignee: Andrey Neporada > Fix For: 0.10.1.0 > > > Currently the only bound on the fetch response size is > max.partition.fetch.bytes * num_partitions. There are two problems: > 1. First this bound is often large. You may chose > max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you > also need to consume 1k partitions this means you may receive a 1GB response > in the worst case! > 2. The actual memory usage is unpredictable. Partition assignment changes, > and you only actually get the full fetch amount when you are behind and there > is a full chunk of data ready. This means an application that seems to work > fine will suddenly OOM when partitions shift or when the application falls > behind. > We need to decouple the fetch response size from the number of partitions. > The proposal for doing this would be to add a new field to the fetch request, > max_bytes which would control the maximum data bytes we would include in the > response. > The implementation on the server side would grab data from each partition in > the fetch request until it hit this limit, then send back just the data for > the partitions that fit in the response. The implementation would need to > start from a random position in the list of topics included in the fetch > request to ensure that in a case of backlog we fairly balance between > partitions (to avoid first giving just the first partition until that is > exhausted, then the next partition, etc). > This setting will make the max.partition.fetch.bytes field in the fetch > request much less useful and we should discuss just getting rid of it. > I believe this also solves the same thing we were trying to address in > KAFKA-598. The max_bytes setting now becomes the new limit that would need to > be compared to max_message size. This can be much larger--e.g. setting a 50MB > max_bytes setting would be okay, whereas now if you set 50MB you may need to > allocate 50MB*num_partitions. > This will require evolving the fetch request protocol version to add the new > field and we should do a KIP for it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2063) Bound fetch response size (KIP-74)
[ https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2063: - Assignee: Andrey Neporada (was: Alexey Ozeritskiy) > Bound fetch response size (KIP-74) > -- > > Key: KAFKA-2063 > URL: https://issues.apache.org/jira/browse/KAFKA-2063 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps >Assignee: Andrey Neporada > Fix For: 0.10.1.0 > > > Currently the only bound on the fetch response size is > max.partition.fetch.bytes * num_partitions. There are two problems: > 1. First this bound is often large. You may chose > max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you > also need to consume 1k partitions this means you may receive a 1GB response > in the worst case! > 2. The actual memory usage is unpredictable. Partition assignment changes, > and you only actually get the full fetch amount when you are behind and there > is a full chunk of data ready. This means an application that seems to work > fine will suddenly OOM when partitions shift or when the application falls > behind. > We need to decouple the fetch response size from the number of partitions. > The proposal for doing this would be to add a new field to the fetch request, > max_bytes which would control the maximum data bytes we would include in the > response. > The implementation on the server side would grab data from each partition in > the fetch request until it hit this limit, then send back just the data for > the partitions that fit in the response. The implementation would need to > start from a random position in the list of topics included in the fetch > request to ensure that in a case of backlog we fairly balance between > partitions (to avoid first giving just the first partition until that is > exhausted, then the next partition, etc). > This setting will make the max.partition.fetch.bytes field in the fetch > request much less useful and we should discuss just getting rid of it. > I believe this also solves the same thing we were trying to address in > KAFKA-598. The max_bytes setting now becomes the new limit that would need to > be compared to max_message size. This can be much larger--e.g. setting a 50MB > max_bytes setting would be okay, whereas now if you set 50MB you may need to > allocate 50MB*num_partitions. > This will require evolving the fetch request protocol version to add the new > field and we should do a KIP for it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2063) Bound fetch response size (KIP-74)
[ https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485357#comment-15485357 ] Alexey Ozeritskiy edited comment on KAFKA-2063 at 9/12/16 9:37 PM: --- The approah KAFKA-3979 is better for configurations with very big max.message.bytes. For example we have the following config on cluster of 77 hosts {code} replica.fetch.max.bytes=13500 replica.socket.receive.buffer.bytes=13500 message.max.bytes=134217728 socket.request.max.bytes=13500 {code} For KIP-74 it is always needed 77*13500 bytes of memory for process. For KAFKA-3979 it is only 77*100 bytes in average. I think we should to do something with that. was (Author: aozeritsky): The approah KAFKA-3979 is better for configurations with very big max.message.size. For example we have the following config on cluster of 77 hosts {code} replica.fetch.max.bytes=13500 replica.socket.receive.buffer.bytes=13500 message.max.bytes=134217728 socket.request.max.bytes=13500 {code} For KIP-74 it is always needed 77*13500 bytes of memory for process. For KAFKA-3979 it is only 77*100 bytes in average. I think we should to do something with that. > Bound fetch response size (KIP-74) > -- > > Key: KAFKA-2063 > URL: https://issues.apache.org/jira/browse/KAFKA-2063 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps >Assignee: Alexey Ozeritskiy > Fix For: 0.10.1.0 > > > Currently the only bound on the fetch response size is > max.partition.fetch.bytes * num_partitions. There are two problems: > 1. First this bound is often large. You may chose > max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you > also need to consume 1k partitions this means you may receive a 1GB response > in the worst case! > 2. The actual memory usage is unpredictable. Partition assignment changes, > and you only actually get the full fetch amount when you are behind and there > is a full chunk of data ready. This means an application that seems to work > fine will suddenly OOM when partitions shift or when the application falls > behind. > We need to decouple the fetch response size from the number of partitions. > The proposal for doing this would be to add a new field to the fetch request, > max_bytes which would control the maximum data bytes we would include in the > response. > The implementation on the server side would grab data from each partition in > the fetch request until it hit this limit, then send back just the data for > the partitions that fit in the response. The implementation would need to > start from a random position in the list of topics included in the fetch > request to ensure that in a case of backlog we fairly balance between > partitions (to avoid first giving just the first partition until that is > exhausted, then the next partition, etc). > This setting will make the max.partition.fetch.bytes field in the fetch > request much less useful and we should discuss just getting rid of it. > I believe this also solves the same thing we were trying to address in > KAFKA-598. The max_bytes setting now becomes the new limit that would need to > be compared to max_message size. This can be much larger--e.g. setting a 50MB > max_bytes setting would be okay, whereas now if you set 50MB you may need to > allocate 50MB*num_partitions. > This will require evolving the fetch request protocol version to add the new > field and we should do a KIP for it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2063) Bound fetch response size (KIP-74)
[ https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485357#comment-15485357 ] Alexey Ozeritskiy commented on KAFKA-2063: -- The approah KAFKA-3979 is better for configurations with very big max.message.size. For example we have the following config on cluster of 77 hosts {code} replica.fetch.max.bytes=13500 replica.socket.receive.buffer.bytes=13500 message.max.bytes=134217728 socket.request.max.bytes=13500 {code} For KIP-74 it is always needed 77*13500 bytes of memory for process. For KAFKA-3979 it is only 77*100 bytes in average. I think we should to do something with that. > Bound fetch response size (KIP-74) > -- > > Key: KAFKA-2063 > URL: https://issues.apache.org/jira/browse/KAFKA-2063 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps >Assignee: Alexey Ozeritskiy > Fix For: 0.10.1.0 > > > Currently the only bound on the fetch response size is > max.partition.fetch.bytes * num_partitions. There are two problems: > 1. First this bound is often large. You may chose > max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you > also need to consume 1k partitions this means you may receive a 1GB response > in the worst case! > 2. The actual memory usage is unpredictable. Partition assignment changes, > and you only actually get the full fetch amount when you are behind and there > is a full chunk of data ready. This means an application that seems to work > fine will suddenly OOM when partitions shift or when the application falls > behind. > We need to decouple the fetch response size from the number of partitions. > The proposal for doing this would be to add a new field to the fetch request, > max_bytes which would control the maximum data bytes we would include in the > response. > The implementation on the server side would grab data from each partition in > the fetch request until it hit this limit, then send back just the data for > the partitions that fit in the response. The implementation would need to > start from a random position in the list of topics included in the fetch > request to ensure that in a case of backlog we fairly balance between > partitions (to avoid first giving just the first partition until that is > exhausted, then the next partition, etc). > This setting will make the max.partition.fetch.bytes field in the fetch > request much less useful and we should discuss just getting rid of it. > I believe this also solves the same thing we were trying to address in > KAFKA-598. The max_bytes setting now becomes the new limit that would need to > be compared to max_message size. This can be much larger--e.g. setting a 50MB > max_bytes setting would be okay, whereas now if you set 50MB you may need to > allocate 50MB*num_partitions. > This will require evolving the fetch request protocol version to add the new > field and we should do a KIP for it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2063) Bound fetch response size (KIP-74)
[ https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy reassigned KAFKA-2063: Assignee: Alexey Ozeritskiy (was: Andrey Neporada) > Bound fetch response size (KIP-74) > -- > > Key: KAFKA-2063 > URL: https://issues.apache.org/jira/browse/KAFKA-2063 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps >Assignee: Alexey Ozeritskiy > Fix For: 0.10.1.0 > > > Currently the only bound on the fetch response size is > max.partition.fetch.bytes * num_partitions. There are two problems: > 1. First this bound is often large. You may chose > max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you > also need to consume 1k partitions this means you may receive a 1GB response > in the worst case! > 2. The actual memory usage is unpredictable. Partition assignment changes, > and you only actually get the full fetch amount when you are behind and there > is a full chunk of data ready. This means an application that seems to work > fine will suddenly OOM when partitions shift or when the application falls > behind. > We need to decouple the fetch response size from the number of partitions. > The proposal for doing this would be to add a new field to the fetch request, > max_bytes which would control the maximum data bytes we would include in the > response. > The implementation on the server side would grab data from each partition in > the fetch request until it hit this limit, then send back just the data for > the partitions that fit in the response. The implementation would need to > start from a random position in the list of topics included in the fetch > request to ensure that in a case of backlog we fairly balance between > partitions (to avoid first giving just the first partition until that is > exhausted, then the next partition, etc). > This setting will make the max.partition.fetch.bytes field in the fetch > request much less useful and we should discuss just getting rid of it. > I believe this also solves the same thing we were trying to address in > KAFKA-598. The max_bytes setting now becomes the new limit that would need to > be compared to max_message size. This can be much larger--e.g. setting a 50MB > max_bytes setting would be okay, whereas now if you set 50MB you may need to > allocate 50MB*num_partitions. > This will require evolving the fetch request protocol version to add the new > field and we should do a KIP for it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4071) Corruptted replication-offset-checkpoint leads to kafka server disfunctional
[ https://issues.apache.org/jira/browse/KAFKA-4071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431056#comment-15431056 ] Alexey Ozeritskiy commented on KAFKA-4071: -- Thanks [~zanezhang]. Could you attach replication-offset-checkpoint ? It is interesting to see its original binary content. > Corruptted replication-offset-checkpoint leads to kafka server disfunctional > > > Key: KAFKA-4071 > URL: https://issues.apache.org/jira/browse/KAFKA-4071 > Project: Kafka > Issue Type: Bug > Components: clients, offset manager >Affects Versions: 0.9.0.1 > Environment: Red Hat Enterprise 6.7 >Reporter: Zane Zhang >Priority: Critical > > For an unknown reason, [kafka data root]/replication-offset-checkpoint was > corrupted. First Kafka reported an NumberFormatException in kafka sever.out. > And then it reported "error when handling request Name: FetchRequest; ... " > ERRORs repeatedly (ERROR details below). As a result, clients cannot read > from or write to Kafka on several partitions until > replication-offset-checkpoint was manually deleted. > Can Kafka broker handle this error and survive from it? > And what's the reason this file was corrupted? - Only one file was corrupted > and no noticeable disk failure was detected. > ERROR [KafkaApi-7] error when handling request > java.lang.NumberFormatException: For input string: " N?-; O" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:77) > at java.lang.Integer.parseInt(Integer.java:493) > at java.lang.Integer.parseInt(Integer.java:539) > at > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:30) > at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:78) > at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:93) > at > kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:173) > at > kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:173) > at scala.collection.immutable.Set$Set2.foreach(Set.scala:111) > at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173) > ERROR [KafkaApi-7] error when handling request Name: FetchRequest; Version: > 1; CorrelationId: 0; ClientId: ReplicaFetcherThread-1-7; ReplicaId: 6; > MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [prodTopicDal09E,166] -> > PartitionFetchInfo(7123666,20971520),[prodTopicDal09E,118] -> > PartitionFetchInfo(7128188,20971520),[prodTopicDal09E,238] -> -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt
[ https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-4039: - Attachment: deadlock-stack2 > Exit Strategy: using exceptions instead of inline invocation of exit/halt > - > > Key: KAFKA-4039 > URL: https://issues.apache.org/jira/browse/KAFKA-4039 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: Maysam Yabandeh > Attachments: deadlock-stack2 > > > The current practice is to directly invoke halt/exit right after the line > that intends to terminate the execution. In the case of System.exit this > could cause deadlocks if the thread invoking System.exit is holding a lock > that will be requested by the shutdown hook threads that will be started by > System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This > would also makes testing more difficult as it would require mocking static > methods of System and Runtime classes, which is not natively supported in > Java. > One alternative suggested > [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269] > would be to throw some dedicated exceptions that will eventually invoke > exit/halt: > {quote} it would be great to move away from executing `System.exit` inline in > favour of throwing an exception (two examples, but maybe we can find better > names: FatalExitException and FatalHaltException) that is caught by some > central code that then does the `System.exit` or `Runtime.getRuntime.halt`. > This helps in a couple of ways: > (1) Avoids issues with locks being held as in this issue > (2) It makes it possible to abstract the action, which is very useful in > tests. At the moment, we can't easily test for these conditions as they cause > the whole test harness to exit. Worse, these conditions are sometimes > triggered in the tests and it's unclear why. > (3) We can have more consistent logging around these actions and possibly > extended logging for tests > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1530) howto update continuously
[ https://issues.apache.org/jira/browse/KAFKA-1530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425454#comment-15425454 ] Alexey Ozeritskiy commented on KAFKA-1530: -- I think this ticket may be closed unclean.leader.election.enable=false helps us Also we've developed tool kafka-restarter that restarts kafka node by node and controls isr status. And we've developed tool fix-isr that can fix isr after cluster power failure. > howto update continuously > - > > Key: KAFKA-1530 > URL: https://issues.apache.org/jira/browse/KAFKA-1530 > Project: Kafka > Issue Type: Wish >Reporter: Stanislav Gilmulin >Assignee: Guozhang Wang >Priority: Minor > Labels: operating_manual, performance > > Hi, > > Could I ask you a question about the Kafka update procedure? > Is there a way to update software, which doesn't require service interruption > or lead to data losses? > We can't stop message brokering during the update as we have a strict SLA. > > Best regards > Stanislav Gilmulin -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt
[ https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424201#comment-15424201 ] Alexey Ozeritskiy commented on KAFKA-4039: -- Thanks [~maysamyabandeh] I'll be happy to test your patch. > Exit Strategy: using exceptions instead of inline invocation of exit/halt > - > > Key: KAFKA-4039 > URL: https://issues.apache.org/jira/browse/KAFKA-4039 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: Maysam Yabandeh > > The current practice is to directly invoke halt/exit right after the line > that intends to terminate the execution. In the case of System.exit this > could cause deadlocks if the thread invoking System.exit is holding a lock > that will be requested by the shutdown hook threads that will be started by > System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This > would also makes testing more difficult as it would require mocking static > methods of System and Runtime classes, which is not natively supported in > Java. > One alternative suggested > [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269] > would be to throw some dedicated exceptions that will eventually invoke > exit/halt: > {quote} it would be great to move away from executing `System.exit` inline in > favour of throwing an exception (two examples, but maybe we can find better > names: FatalExitException and FatalHaltException) that is caught by some > central code that then does the `System.exit` or `Runtime.getRuntime.halt`. > This helps in a couple of ways: > (1) Avoids issues with locks being held as in this issue > (2) It makes it possible to abstract the action, which is very useful in > tests. At the moment, we can't easily test for these conditions as they cause > the whole test harness to exit. Worse, these conditions are sometimes > triggered in the tests and it's unclear why. > (3) We can have more consistent logging around these actions and possibly > extended logging for tests > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt
[ https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423298#comment-15423298 ] Alexey Ozeritskiy commented on KAFKA-4039: -- Thanks [~maysamyabandeh]. I think we have to decide how to solve the problem or revert commit d1757c70a198014e85026c01a1a4ccab6a12da7d because it brings more serious problem. In fact, my "solution" from KAFKA-3924 does not solve the problem because shutdown hook waits for sheduler and vice versa. > Exit Strategy: using exceptions instead of inline invocation of exit/halt > - > > Key: KAFKA-4039 > URL: https://issues.apache.org/jira/browse/KAFKA-4039 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: Maysam Yabandeh > > The current practice is to directly invoke halt/exit right after the line > that intends to terminate the execution. In the case of System.exit this > could cause deadlocks if the thread invoking System.exit is holding a lock > that will be requested by the shutdown hook threads that will be started by > System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This > would also makes testing more difficult as it would require mocking static > methods of System and Runtime classes, which is not natively supported in > Java. > One alternative suggested > [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269] > would be to throw some dedicated exceptions that will eventually invoke > exit/halt: > {quote} it would be great to move away from executing `System.exit` inline in > favour of throwing an exception (two examples, but maybe we can find better > names: FatalExitException and FatalHaltException) that is caught by some > central code that then does the `System.exit` or `Runtime.getRuntime.halt`. > This helps in a couple of ways: > (1) Avoids issues with locks being held as in this issue > (2) It makes it possible to abstract the action, which is very useful in > tests. At the moment, we can't easily test for these conditions as they cause > the whole test harness to exit. Worse, these conditions are sometimes > triggered in the tests and it's unclear why. > (3) We can have more consistent logging around these actions and possibly > extended logging for tests > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3979) Optimize memory used by replication process by using adaptive fetch message size
[ https://issues.apache.org/jira/browse/KAFKA-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy resolved KAFKA-3979. -- Resolution: Won't Fix There is a better solution: KAFKA-2063 > Optimize memory used by replication process by using adaptive fetch message > size > > > Key: KAFKA-3979 > URL: https://issues.apache.org/jira/browse/KAFKA-3979 > Project: Kafka > Issue Type: Improvement > Components: replication >Affects Versions: 0.10.0.0 >Reporter: Andrey Neporada > > Current replication process fetches messages in replica.fetch.max.bytes-sized > chunks. > Since replica.fetch.max.bytes should be bigger than max.message.bytes for > replication to work, one can face big memory consumption for replication > process, especially for installations with big number of partitions. > Proposed solution is to try to fetch messages in smaller chunks (say > replica.fetch.base.bytes). > If we encounter message bigger than current fetch chunk, we increase chunk > (f.e. twofold) and retry. After replicating this bigger message, we shrunk > fetch chunk size back until it reaches replica.fetch.base.bytes > replica.fetch.base.bytes should be chosen big enough not to affect throughput > and to be bigger than most of messages. > However, it can be much less than replica.fetch.max.bytes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO
[ https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15420251#comment-15420251 ] Alexey Ozeritskiy commented on KAFKA-3924: -- IMHO the simplest way to solve the problem is to execute System.exit asyncroniously: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index ef602e4..ed00a73 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -175,10 +175,13 @@ class ReplicaFetcherThread(name: String, if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. -fatal("Exiting because log truncation is not allowed for partition %s,".format(topicAndPartition) + +val msg = "Exiting because log truncation is not allowed for partition %s,".format(topicAndPartition) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" - .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) -System.exit(1) + .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset) +fatal(msg) + +replicaMgr.scheduler.schedule("exit", () => System.exit(1)) +throw new Exception(msg) } warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2b97783..6e6539b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -105,7 +105,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, jTime: JTime, val zkUtils: ZkUtils, - scheduler: Scheduler, + val scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { {code} > Data loss due to halting when LEO is larger than leader's LEO > - > > Key: KAFKA-3924 > URL: https://issues.apache.org/jira/browse/KAFKA-3924 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: Maysam Yabandeh > Fix For: 0.10.0.1 > > Attachments: deadlock-stack > > > Currently the follower broker panics when its LEO is larger than its leader's > LEO, and assuming that this is an impossible state to reach, halts the > process to prevent any further damage. > {code} > if (leaderEndOffset < replica.logEndOffset.messageOffset) { > // Prior to truncating the follower's log, ensure that doing so is not > disallowed by the configuration for unclean leader election. > // This situation could only happen if the unclean election > configuration for a topic changes while a replica is down. Otherwise, > // we should never encounter this situation since a non-ISR leader > cannot be elected if disallowed by the broker configuration. > if (!LogConfig.fromProps(brokerConfig.originals, > AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, > ConfigType.Topic, > topicAndPartition.topic)).uncleanLeaderElectionEnable) { > // Log a fatal error and shutdown the broker to ensure that data loss > does not unexpectedly occur. > fatal("...") > Runtime.getRuntime.halt(1) > } > {code} > Firstly this assumption is invalid and there are legitimate cases (examples > below) that this case could actually occur. Secondly halt results into the > broker losing its un-flushed data, and if multiple brokers halt > simultaneously there is a chance that both leader and followers of a > partition are among the halted brokers, which would result into permanent > data loss. > Given that this is a legit case, we suggest to replace it with a graceful > shutdown to avoid propagating data loss to the entire cluster. > Details: > One legit case that this could actually occur is when a troubled broker > shrinks its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In > this case the broker has lost some data but the controller cannot still > elects the others as the leader. If the crashed broker comes back up, the > controller
[jira] [Updated] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO
[ https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3924: - Attachment: deadlock-stack > Data loss due to halting when LEO is larger than leader's LEO > - > > Key: KAFKA-3924 > URL: https://issues.apache.org/jira/browse/KAFKA-3924 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: Maysam Yabandeh > Fix For: 0.10.0.1 > > Attachments: deadlock-stack > > > Currently the follower broker panics when its LEO is larger than its leader's > LEO, and assuming that this is an impossible state to reach, halts the > process to prevent any further damage. > {code} > if (leaderEndOffset < replica.logEndOffset.messageOffset) { > // Prior to truncating the follower's log, ensure that doing so is not > disallowed by the configuration for unclean leader election. > // This situation could only happen if the unclean election > configuration for a topic changes while a replica is down. Otherwise, > // we should never encounter this situation since a non-ISR leader > cannot be elected if disallowed by the broker configuration. > if (!LogConfig.fromProps(brokerConfig.originals, > AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, > ConfigType.Topic, > topicAndPartition.topic)).uncleanLeaderElectionEnable) { > // Log a fatal error and shutdown the broker to ensure that data loss > does not unexpectedly occur. > fatal("...") > Runtime.getRuntime.halt(1) > } > {code} > Firstly this assumption is invalid and there are legitimate cases (examples > below) that this case could actually occur. Secondly halt results into the > broker losing its un-flushed data, and if multiple brokers halt > simultaneously there is a chance that both leader and followers of a > partition are among the halted brokers, which would result into permanent > data loss. > Given that this is a legit case, we suggest to replace it with a graceful > shutdown to avoid propagating data loss to the entire cluster. > Details: > One legit case that this could actually occur is when a troubled broker > shrinks its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In > this case the broker has lost some data but the controller cannot still > elects the others as the leader. If the crashed broker comes back up, the > controller elects it as the leader, and as a result all other brokers who are > now following it halt since they have LEOs larger than that of shrunk topics > in the restarted broker. We actually had a case that bringing up a crashed > broker simultaneously took down the entire cluster and as explained above > this could result into data loss. > The other legit case is when multiple brokers ungracefully shutdown at the > same time. In this case both of the leader and the followers lose their > un-flushed data but one of them has HW larger than the other. Controller > elects the one who comes back up sooner as the leader and if its LEO is less > than its future follower, the follower will halt (and probably lose more > data). Simultaneous ungrateful shutdown could happen due to hardware issue > (e.g., rack power failure), operator errors, or software issue (e.g., the > case above that is further explained in KAFKA-3410 and KAFKA-3861 and causes > simultaneous halts in multiple brokers) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO
[ https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15419873#comment-15419873 ] Alexey Ozeritskiy commented on KAFKA-3924: -- I've got the deadlock with that patch. Stack traces: {code} "ReplicaFetcherThread-3-2" #112 prio=5 os_prio=0 tid=0x7f0acc10 nid=0xfd54f in Object.wait() [0x7f0b141d7000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1245) - locked <0x0003d8269bc8> (a kafka.Kafka$$anon$1) at java.lang.Thread.join(Thread.java:1319) at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106) at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46) at java.lang.Shutdown.runHooks(Shutdown.java:123) at java.lang.Shutdown.sequence(Shutdown.java:167) at java.lang.Shutdown.exit(Shutdown.java:212) - locked <0x0003d8106e88> (a java.lang.Class for java.lang.Shutdown) at java.lang.Runtime.exit(Runtime.java:109) at java.lang.System.exit(System.java:971) at kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:179) {code} {code} "Thread-2" #29 prio=5 os_prio=0 tid=0x7f0a70008000 nid=0xfecbf in Object.wait() [0x7f0b166e5000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1245) - locked <0x0003e5c46960> (a java.lang.Thread) at java.lang.Thread.join(Thread.java:1319) at kafka.server.KafkaRequestHandlerPool$$anonfun$shutdown$3.apply(KafkaRequestHandler.scala:92) at kafka.server.KafkaRequestHandlerPool$$anonfun$shutdown$3.apply(KafkaRequestHandler.scala:91) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at kafka.server.KafkaRequestHandlerPool.shutdown(KafkaRequestHandler.scala:91) at kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:559) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:51) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:51) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:559) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:49) at kafka.Kafka$$anon$1.run(Kafka.scala:63) {code} System.exit executes hook in Thread 2 and joins it (first trace). Thread 2 joins ReplicaFetcherThread-3-2 (second trace). So they are waiting each other forever. > Data loss due to halting when LEO is larger than leader's LEO > - > > Key: KAFKA-3924 > URL: https://issues.apache.org/jira/browse/KAFKA-3924 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: Maysam Yabandeh > Fix For: 0.10.0.1 > > > Currently the follower broker panics when its LEO is larger than its leader's > LEO, and assuming that this is an impossible state to reach, halts the > process to prevent any further damage. > {code} > if (leaderEndOffset < replica.logEndOffset.messageOffset) { > // Prior to truncating the follower's log, ensure that doing so is not > disallowed by the configuration for unclean leader election. > // This situation could only happen if the unclean election > configuration for a topic changes while a replica is down. Otherwise, > // we should never encounter this situation since a non-ISR leader > cannot be elected if disallowed by the broker configuration. > if (!LogConfig.fromProps(brokerConfig.originals, > AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, > ConfigType.Topic, > topicAndPartition.topic)).uncleanLeaderElectionEnable) { > // Log a fatal error and shutdown the broker to ensure that data loss > does not unexpectedly occur. > fatal("...") > Runtime.getRuntime.halt(1) > } > {code} > Firstly this assumption is invalid and there are legitimate cases (examples > below) that this case could actually occur. Secondly halt results into the > broker losing its un-flushed data, and if multiple brokers halt > simultaneously there is a chance that both leader and followers of a > partition are among the halted brokers, which would result into permanent > data loss. > Given that this is a legit case, we suggest to replace it with a graceful > shutdown to avoid propagating data loss to the
[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
[ https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3997: - Issue Type: Improvement (was: Bug) > Halting because log truncation is not allowed and suspicious logging > > > Key: KAFKA-3997 > URL: https://issues.apache.org/jira/browse/KAFKA-3997 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-3997.patch > > > When follower wants to truncate partition and it is not allowed it prints the > following message: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current > leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} > It is difficult to understand which partition is it. > I suggest to log here partition instead of topic. For example: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], > Current leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
[ https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3997: - Description: When follower wants to truncate partition and it is not allowed it prints the following message: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} was: When follower wants to truncate partition and it is not allowed it print the following message: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} > Halting because log truncation is not allowed and suspicious logging > > > Key: KAFKA-3997 > URL: https://issues.apache.org/jira/browse/KAFKA-3997 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-3997.patch > > > When follower wants to truncate partition and it is not allowed it prints the > following message: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current > leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} > It is difficult to understand which partition is it. > I suggest to log here partition instead of topic. For example: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], > Current leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
[ https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3997: - Description: When follower wants to truncate partition and it is not allowed it print the following message: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} was: When follower wants to truncate partition and it is not allowed it print the following message: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} > Halting because log truncation is not allowed and suspicious logging > > > Key: KAFKA-3997 > URL: https://issues.apache.org/jira/browse/KAFKA-3997 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-3997.patch > > > When follower wants to truncate partition and it is not allowed it print the > following message: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current > leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} > It is difficult to understand which partition is it. > I suggest to log here partition instead of topic. For example: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], > Current leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
[ https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3997: - Attachment: KAFKA-3997.patch > Halting because log truncation is not allowed and suspicious logging > > > Key: KAFKA-3997 > URL: https://issues.apache.org/jira/browse/KAFKA-3997 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-3997.patch > > > When follower wants to truncate partition and it is not allowed it print the > following message: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current > leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} > It is difficult to understand which partition is it. > I suggest to log here partition instead of topic. For example: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], > Current leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
[ https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3997: - Status: Patch Available (was: Open) > Halting because log truncation is not allowed and suspicious logging > > > Key: KAFKA-3997 > URL: https://issues.apache.org/jira/browse/KAFKA-3997 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.9.0.1, 0.9.0.0 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-3997.patch > > > When follower wants to truncate partition and it is not allowed it print the > following message: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current > leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} > It is difficult to understand which partition is it. > I suggest to log here partition instead of topic. For example: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], > Current leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
[ https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3997: - Description: When follower wants to truncate partition and it is not allowed it print the following message: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} was: When follower wants to truncate partition and it is not allowed it print the following message: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} > Halting because log truncation is not allowed and suspicious logging > > > Key: KAFKA-3997 > URL: https://issues.apache.org/jira/browse/KAFKA-3997 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: Alexey Ozeritskiy > > When follower wants to truncate partition and it is not allowed it print the > following message: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current > leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} > It is difficult to understand which partition is it. > I suggest to log here partition instead of topic. For example: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], > Current leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
[ https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-3997: - Description: When follower wants to truncate partition and it is not allowed it print the following message: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {noformat} [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) {noformat} was: When follower wants to truncate partition and it is not allowed it print the following message: {{ [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) }} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {{ [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) }} > Halting because log truncation is not allowed and suspicious logging > > > Key: KAFKA-3997 > URL: https://issues.apache.org/jira/browse/KAFKA-3997 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: Alexey Ozeritskiy > > When follower wants to truncate partition and it is not allowed it print the > following message: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current > leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} > It is difficult to understand which partition is it. > I suggest to log here partition instead of topic. For example: > {noformat} > [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because > log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], > Current leader 19's latest offset 50260815 is less > than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging
Alexey Ozeritskiy created KAFKA-3997: Summary: Halting because log truncation is not allowed and suspicious logging Key: KAFKA-3997 URL: https://issues.apache.org/jira/browse/KAFKA-3997 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.10.0.0, 0.9.0.1, 0.9.0.0 Reporter: Alexey Ozeritskiy When follower wants to truncate partition and it is not allowed it print the following message: {{ [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) }} It is difficult to understand which partition is it. I suggest to log here partition instead of topic. For example: {{ [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], Current leader 19's latest offset 50260815 is less than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread) }} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up
[ https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15381452#comment-15381452 ] Alexey Ozeritskiy commented on KAFKA-3693: -- It seems this issue is related to KAFKA-2178 > Race condition between highwatermark-checkpoint thread and > handleLeaderAndIsrRequest at broker start-up > --- > > Key: KAFKA-3693 > URL: https://issues.apache.org/jira/browse/KAFKA-3693 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.1 >Reporter: Maysam Yabandeh > > Upon broker start-up, a race between highwatermark-checkpoint thread to write > replication-offset-checkpoint file and handleLeaderAndIsrRequest thread > reading from it causes the highwatermark for some partitions to be reset to > 0. In the good case, this results the replica to truncate its entire log to 0 > and hence initiates fetching of terabytes of data from the lead broker, which > sometimes leads to hours of downtime. We observed the bad cases that the > reset offset can propagate to recovery-point-offset-checkpoint file, making a > lead broker to truncate the file. This seems to have the potential to lead to > data loss if the truncation happens at both follower and leader brokers. > This is the particular faulty scenario manifested in our tests: > # The broker restarts and receive LeaderAndIsr from the controller > # LeaderAndIsr message however does not contain all the partitions (probably > because other brokers were churning at the same time) > # becomeLeaderOrFollower calls getOrCreatePartition and updates the > allPartitions with the partitions included in the LeaderAndIsr message {code} > def getOrCreatePartition(topic: String, partitionId: Int): Partition = { > var partition = allPartitions.get((topic, partitionId)) > if (partition == null) { > allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, > partitionId, time, this)) > {code} > # replication-offset-checkpoint jumps in taking a snapshot of (the partial) > allReplicas' high watermark into replication-offset-checkpoint file {code} > def checkpointHighWatermarks() { > val replicas = > allPartitions.values.map(_.getReplica(config.brokerId)).collect{case > Some(replica) => replica}{code} hence rewriting the previous highwatermarks. > # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read > the (now partial) file through Partition::getOrCreateReplica {code} > val checkpoint = > replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) > val offsetMap = checkpoint.read > if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) > info("No checkpointed highwatermark is found for partition > [%s,%d]".format(topic, partitionId)) > {code} > We are not entirely sure whether the initial LeaderAndIsr message including a > subset of partitions is critical in making this race condition manifest or > not. But it is an important detail since it clarifies that a solution based > on not letting the highwatermark-checkpoint thread jumping in the middle of > processing a LeaderAndIsr message would not suffice. > The solution we are thinking of is to force initializing allPartitions by the > partitions listed in the replication-offset-checkpoint (and perhaps > recovery-point-offset-checkpoint file too) when a server starts. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1543) Changing replication factor
[ https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708037#comment-14708037 ] Alexey Ozeritskiy commented on KAFKA-1543: -- I think refactoring should be a separate patch and may be a separate jira-issue. Changing replication factor --- Key: KAFKA-1543 URL: https://issues.apache.org/jira/browse/KAFKA-1543 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy Assignee: Alexander Pakulov Attachments: can-change-replication.patch It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage {code} kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1543) Changing replication factor
[ https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708039#comment-14708039 ] Alexey Ozeritskiy commented on KAFKA-1543: -- A you sure that your aproach will work w/o cluster restart? AddPartitionsListener only handles new partitions but not new replicas. Changing replication factor --- Key: KAFKA-1543 URL: https://issues.apache.org/jira/browse/KAFKA-1543 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy Assignee: Alexander Pakulov Attachments: can-change-replication.patch It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage {code} kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2178) Loss of highwatermarks on incorrect cluster shutdown/restart
[ https://issues.apache.org/jira/browse/KAFKA-2178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2178: - Status: Patch Available (was: Open) Loss of highwatermarks on incorrect cluster shutdown/restart Key: KAFKA-2178 URL: https://issues.apache.org/jira/browse/KAFKA-2178 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2178.patch ReplicaManager flushes highwatermarks only for partitions which it recieved from Controller. If Controller sends incomplete list of partitions then ReplicaManager will write incomplete list of highwatermarks. As a result one can lose a lot of data during incorrect broker restart. We got this situation in real life on our cluster. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2178) Loss of highwatermarks on incorrect cluster shutdown/restart
[ https://issues.apache.org/jira/browse/KAFKA-2178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2178: - Attachment: KAFKA-2178.patch Loss of highwatermarks on incorrect cluster shutdown/restart Key: KAFKA-2178 URL: https://issues.apache.org/jira/browse/KAFKA-2178 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2178.patch ReplicaManager flushes highwatermarks only for partitions which it recieved from Controller. If Controller sends incomplete list of partitions then ReplicaManager will write incomplete list of highwatermarks. As a result one can lose a lot of data during incorrect broker restart. We got this situation in real life on our cluster. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2178) Loss of highwatermarks on incorrect cluster shutdown/restart
Alexey Ozeritskiy created KAFKA-2178: Summary: Loss of highwatermarks on incorrect cluster shutdown/restart Key: KAFKA-2178 URL: https://issues.apache.org/jira/browse/KAFKA-2178 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2178.patch ReplicaManager flushes highwatermarks only for partitions which it recieved from Controller. If Controller sends incomplete list of partitions then ReplicaManager will write incomplete list of highwatermarks. As a result one can lose a lot of data during incorrect broker restart. We got this situation in real life on our cluster. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization
[ https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2174: - Attachment: KAFKA-2174.patch Wrong TopicMetadata deserialization --- Key: KAFKA-2174 URL: https://issues.apache.org/jira/browse/KAFKA-2174 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2174.patch TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of partitions but it is not true. On incomplete metadata we will get java.lang.ArrayIndexOutOfBoundsException: {code} java.lang.ArrayIndexOutOfBoundsException: 47 at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) {code} We sometimes get this exceptions on any broker restart (kill -TERM, controlled.shutdown.enable=false). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization
[ https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2174: - Status: Patch Available (was: Open) Wrong TopicMetadata deserialization --- Key: KAFKA-2174 URL: https://issues.apache.org/jira/browse/KAFKA-2174 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2174.patch TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of partitions but it is not true. On incomplete metadata we will get java.lang.ArrayIndexOutOfBoundsException: {code} java.lang.ArrayIndexOutOfBoundsException: 47 at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) {code} We sometimes get this exceptions on any broker restart (kill -TERM, controlled.shutdown.enable=false). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2174) Wrong TopicMetadata deserialization
Alexey Ozeritskiy created KAFKA-2174: Summary: Wrong TopicMetadata deserialization Key: KAFKA-2174 URL: https://issues.apache.org/jira/browse/KAFKA-2174 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of partitions but it is not true. On incomplete metadata we will get java.lang.ArrayIndexOutOfBoundsException: {code} java.lang.ArrayIndexOutOfBoundsException: 47 at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) {code} We sometimes get this exceptions on any broker restart (kill -TERM, controlled.shutdown.enable=false). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset
[ https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2164: - Status: Patch Available (was: Open) ReplicaFetcherThread: suspicious log message on reset offset Key: KAFKA-2164 URL: https://issues.apache.org/jira/browse/KAFKA-2164 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2164.patch If log.logEndOffset leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} I think the right message should be: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occurs because ReplicaFetcherThread resets the offset and then print log message. Posible solution: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/ index b31b432..181cbc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt warn(Replica %d for partition %s reset its fetch offset from %d to curre .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset. + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt leaderStartOffset } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset
[ https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2164: - Attachment: KAFKA-2164.patch ReplicaFetcherThread: suspicious log message on reset offset Key: KAFKA-2164 URL: https://issues.apache.org/jira/browse/KAFKA-2164 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2164.patch If log.logEndOffset leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} I think the right message should be: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occurs because ReplicaFetcherThread resets the offset and then print log message. Posible solution: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/ index b31b432..181cbc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt warn(Replica %d for partition %s reset its fetch offset from %d to curre .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset. + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt leaderStartOffset } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2165: - Status: Patch Available (was: Open) ReplicaFetcherThread: data loss on unknown exception Key: KAFKA-2165 URL: https://issues.apache.org/jira/browse/KAFKA-2165 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2165.patch Sometimes in our cluster some replica gets out of the isr. Then broker redownloads the partition from the beginning. We got the following messages in logs: {code} # The leader: [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when processing fetch request for partition [topic,11] offset 54369274 from follower with correlation id 2634499. Possible cause: Request for offset 54369274 but we only have log segments in the range 49322124 to 54369273. (kafka.server.ReplicaManager) {code} {code} # The follower: [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occures because we update fetchOffset [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124] and then try to process message. If any exception except OffsetOutOfRangeCode occures we get unsynchronized fetchOffset and replica.logEndOffset. On next fetch iteration we can get fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2165: - Attachment: KAFKA-2165.patch ReplicaFetcherThread: data loss on unknown exception Key: KAFKA-2165 URL: https://issues.apache.org/jira/browse/KAFKA-2165 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2165.patch Sometimes in our cluster some replica gets out of the isr. Then broker redownloads the partition from the beginning. We got the following messages in logs: {code} # The leader: [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when processing fetch request for partition [topic,11] offset 54369274 from follower with correlation id 2634499. Possible cause: Request for offset 54369274 but we only have log segments in the range 49322124 to 54369273. (kafka.server.ReplicaManager) {code} {code} # The follower: [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occures because we update fetchOffset [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124] and then try to process message. If any exception except OffsetOutOfRangeCode occures we get unsynchronized fetchOffset and replica.logEndOffset. On next fetch iteration we can get fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception
Alexey Ozeritskiy created KAFKA-2165: Summary: ReplicaFetcherThread: data loss on unknown exception Key: KAFKA-2165 URL: https://issues.apache.org/jira/browse/KAFKA-2165 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Sometimes in our cluster some replica gets out of the isr. Then broker redownloads the partition from the beginning. We got the following messages in logs: {code} # The leader: [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when processing fetch request for partition [topic,11] offset 54369274 from follower with correlation id 2634499. Possible cause: Request for offset 54369274 but we only have log segments in the range 49322124 to 54369273. (kafka.server.ReplicaManager) {code} {code} # The follower: [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occures because we update fetchOffset [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124] and then try to process message. If any exception except OffsetOutOfRangeCode occures we get unsynchronized fetchOffset and replica.logEndOffset. On next fetch iteration we can get fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset
[ https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2164: - Description: If log.logEndOffset leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} I think the right message should be: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 54369274 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occurs because ReplicaFetcherThread resets the offset and then print log message. Posible solution: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/ index b31b432..181cbc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt warn(Replica %d for partition %s reset its fetch offset from %d to curre .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset. + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt leaderStartOffset } } {code} was: If log.logEndOffset leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} I think the right message should be: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occurs because ReplicaFetcherThread resets the offset and then print log message. Posible solution: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/ index b31b432..181cbc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt warn(Replica %d for partition %s reset its fetch offset from %d to curre .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset. + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt leaderStartOffset } } {code} ReplicaFetcherThread: suspicious log message on reset offset Key: KAFKA-2164 URL: https://issues.apache.org/jira/browse/KAFKA-2164 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2164.patch If log.logEndOffset leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to
[jira] [Created] (KAFKA-2133) Deadlock in DeleteTopicsThread
Alexey Ozeritskiy created KAFKA-2133: Summary: Deadlock in DeleteTopicsThread Key: KAFKA-2133 URL: https://issues.apache.org/jira/browse/KAFKA-2133 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Priority: Critical Controller hangs after deleting multiple topics. jstack: 1. delete-topics-thread acquired controllerLock and waiting for blocking queue: {code} delete-topics-thread-2 prio=10 tid=0x7f3a8d4e4000 nid=0x6924 waiting on condition [0x7f3507684000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00047196e738 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) - locked 0x00045eab3078 (a java.lang.Object) at kafka.controller.KafkaController.sendRequest(KafkaController.scala:668) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:299) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:291) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:291) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:976) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:303) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) {code} 2. Controller-2-to-broker waiting for controllerLock and cannot take messages from blocking queue: {code} Controller-2-to-broker-3-send-thread prio=10 tid=0x7f3a8d4a3000 nid=0x64d1 waiting on condition [0x7f3507786000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000468babde8 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) at kafka.utils.Utils$.inLock(Utils.scala:533) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:371) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338) at
[jira] [Commented] (KAFKA-2133) Deadlock in DeleteTopicsThread
[ https://issues.apache.org/jira/browse/KAFKA-2133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502028#comment-14502028 ] Alexey Ozeritskiy commented on KAFKA-2133: -- We've set it to 128. I think that default Int.MaxValue is dangerous due to posible memory issues. Deadlock in DeleteTopicsThread -- Key: KAFKA-2133 URL: https://issues.apache.org/jira/browse/KAFKA-2133 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Priority: Critical Controller hangs after deleting multiple topics. jstack: 1. delete-topics-thread acquired controllerLock and waiting for blocking queue: {code} delete-topics-thread-2 prio=10 tid=0x7f3a8d4e4000 nid=0x6924 waiting on condition [0x7f3507684000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00047196e738 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) - locked 0x00045eab3078 (a java.lang.Object) at kafka.controller.KafkaController.sendRequest(KafkaController.scala:668) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:299) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:291) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:291) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:976) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:303) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) {code} 2. Controller-2-to-broker waiting for controllerLock and cannot take messages from blocking queue: {code} Controller-2-to-broker-3-send-thread prio=10 tid=0x7f3a8d4a3000 nid=0x64d1 waiting on condition [0x7f3507786000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000468babde8 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) at kafka.utils.Utils$.inLock(Utils.scala:533) at
[jira] [Resolved] (KAFKA-2133) Deadlock in DeleteTopicsThread
[ https://issues.apache.org/jira/browse/KAFKA-2133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy resolved KAFKA-2133. -- Resolution: Fixed Deadlock in DeleteTopicsThread -- Key: KAFKA-2133 URL: https://issues.apache.org/jira/browse/KAFKA-2133 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Priority: Critical Controller hangs after deleting multiple topics. jstack: 1. delete-topics-thread acquired controllerLock and waiting for blocking queue: {code} delete-topics-thread-2 prio=10 tid=0x7f3a8d4e4000 nid=0x6924 waiting on condition [0x7f3507684000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00047196e738 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) - locked 0x00045eab3078 (a java.lang.Object) at kafka.controller.KafkaController.sendRequest(KafkaController.scala:668) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:299) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:291) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:291) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:976) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:303) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) {code} 2. Controller-2-to-broker waiting for controllerLock and cannot take messages from blocking queue: {code} Controller-2-to-broker-3-send-thread prio=10 tid=0x7f3a8d4a3000 nid=0x64d1 waiting on condition [0x7f3507786000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000468babde8 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) at kafka.utils.Utils$.inLock(Utils.scala:533) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:371)
[jira] [Commented] (KAFKA-1908) Split brain
[ https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14347063#comment-14347063 ] Alexey Ozeritskiy commented on KAFKA-1908: -- Hi all, the following is our scenario. We use a custom consumer which works on broker hosts and always consumes leader partitions from localhost. Consumer reads data and pushs it to 3rdparty system. We send a metadata request to localhost and don't use the zk data. We use zk locks to guarantee that we read a single partition in one process. Sometimes we release the locks and consumer can begin to consume the data from broken host and reset offsets. Split brain --- Key: KAFKA-1908 URL: https://issues.apache.org/jira/browse/KAFKA-1908 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.0 Reporter: Alexey Ozeritskiy In some cases, there may be two leaders for one partition. Steps to reproduce: # We have 3 brokers, 1 partition with 3 replicas: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} # controller works on broker 3 # let the kafka port be 9092. We execute on broker 1: {code} iptables -A INPUT -p tcp --dport 9092 -j REJECT {code} # Initiate replica election # As a result: Broker 1: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} Broker 2: {code} TopicAndPartition: [partition,0]Leader: 2 Replicas: [2,1,3] ISR: [1,2,3] {code} # Flush the iptables rules on broker 1 Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not receive new data. A consumer can read data from replica-1 or replica-2. When it reads from replica-1 it resets the offsets and than can read duplicates from replica-2. We saw this situation in our production cluster when it had network problems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1908) Split brain
Alexey Ozeritskiy created KAFKA-1908: Summary: Split brain Key: KAFKA-1908 URL: https://issues.apache.org/jira/browse/KAFKA-1908 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Alexey Ozeritskiy In some cases, there may be two leaders for one partition. Steps to reproduce: # We have 3 brokers, 1 partition with 3 replicas: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} # controller works on broker 3 # let the kafka port be 9092. We execute on broker 1: {code} iptables -A INPUT -p tcp --dport 9092 -j REJECT {code} # Initiate replica election # As a result: Broker 1: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} Broker 2: {code} TopicAndPartition: [partition,0]Leader: 2 Replicas: [2,1,3] ISR: [1,2,3] {code} # Flush the iptables rules on broker 1 Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not receive new data. A consumer can read data from replica-1 or replica-2. When it reads from replica-1 it resets the offsets and than can read duplicates from replica-2. We saw this situation in our production cluster when it had network problems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1804) Kafka network thread lacks top exception handler
[ https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287713#comment-14287713 ] Alexey Ozeritskiy commented on KAFKA-1804: -- The last time we saw the bug during restart the network switch on a cluster of 20 machines. kafka-network-threads fell down on more than half machines. As a result, the cluster became unavailable. We are trying to find the specific steps that reproduce the problem. Kafka network thread lacks top exception handler Key: KAFKA-1804 URL: https://issues.apache.org/jira/browse/KAFKA-1804 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Oleg Golovin Priority: Critical We have faced the problem that some kafka network threads may fail, so that jstack attached to Kafka process showed fewer threads than we had defined in our Kafka configuration. This leads to API requests processed by this thread getting stuck unresponed. There were no error messages in the log regarding thread failure. We have examined Kafka code to find out there is no top try-catch block in the network thread code, which could at least log possible errors. Could you add top-level try-catch block for the network thread, which should recover network thread in case of exception? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1804) Kafka network thread lacks top exception handler
[ https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1804: - Priority: Critical (was: Major) Kafka network thread lacks top exception handler Key: KAFKA-1804 URL: https://issues.apache.org/jira/browse/KAFKA-1804 Project: Kafka Issue Type: Bug Reporter: Oleg Golovin Priority: Critical We have faced the problem that some kafka network threads may fail, so that jstack attached to Kafka process showed fewer threads than we had defined in our Kafka configuration. This leads to API requests processed by this thread getting stuck unresponed. There were no error messages in the log regarding thread failure. We have examined Kafka code to find out there is no top try-catch block in the network thread code, which could at least log possible errors. Could you add top-level try-catch block for the network thread, which should recover network thread in case of exception? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1804) Kafka network thread lacks top exception handler
[ https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14279396#comment-14279396 ] Alexey Ozeritskiy edited comment on KAFKA-1804 at 1/16/15 7:44 AM: --- We've written the simple patch for kafka-network-thread: {code:java} override def run(): Unit = { try { iteration() // = the original run() } catch { case e: Throwable = error(ERROR IN NETWORK THREAD: %s.format(e), e) Runtime.getRuntime.halt(1) } } {code} and got the following trace: {code} [2015-01-15 23:04:08,537] ERROR ERROR IN NETWORK THREAD: java.util.NoSuchElementException: None.get (kafka.network.Processor) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:544) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:394) at kafka.network.Processor.processNewResponses(SocketServer.scala:426) at kafka.network.Processor.iteration(SocketServer.scala:328) at kafka.network.Processor.run(SocketServer.scala:381) at java.lang.Thread.run(Thread.java:745) {code} was (Author: aozeritsky): We've written the simple patch for kafka-network-thread: {code:java} override def run(): Unit = { try { original_run() } catch { case e: Throwable = error(ERROR IN NETWORK THREAD: %s.format(e), e) Runtime.getRuntime.halt(1) } } {code} and got the following trace: {code} [2015-01-15 23:04:08,537] ERROR ERROR IN NETWORK THREAD: java.util.NoSuchElementException: None.get (kafka.network.Processor) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:544) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:394) at kafka.network.Processor.processNewResponses(SocketServer.scala:426) at kafka.network.Processor.iteration(SocketServer.scala:328) at kafka.network.Processor.run(SocketServer.scala:381) at java.lang.Thread.run(Thread.java:745) {code} Kafka network thread lacks top exception handler Key: KAFKA-1804 URL: https://issues.apache.org/jira/browse/KAFKA-1804 Project: Kafka Issue Type: Bug Reporter: Oleg Golovin We have faced the problem that some kafka network threads may fail, so that jstack attached to Kafka process showed fewer threads than we had defined in our Kafka configuration. This leads to API requests processed by this thread getting stuck unresponed. There were no error messages in the log regarding thread failure. We have examined Kafka code to find out there is no top try-catch block in the network thread code, which could at least log possible errors. Could you add top-level try-catch block for the network thread, which should recover network thread in case of exception? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse
[ https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14158114#comment-14158114 ] Alexey Ozeritskiy commented on KAFKA-1644: -- This patch simplifies writing clients. Now we have to use the following ugly code: {code:java} if (id == ResponseKeys.FetchKey) { val response = FetchResponse.readFrom(contentBuffer) listener ! FetchAnswer(response) } else { val response = ResponseKeys.deserializerForKey(id)(contentBuffer) listener ! Answer(response) } {code} Inherit FetchResponse from RequestOrResponse Key: KAFKA-1644 URL: https://issues.apache.org/jira/browse/KAFKA-1644 Project: Kafka Issue Type: Bug Reporter: Anton Karamanov Assignee: Anton Karamanov Attachments: 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of RequestOrResponse, which requires handling it as a special case while processing responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse
[ https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1644: - Issue Type: Improvement (was: Bug) Inherit FetchResponse from RequestOrResponse Key: KAFKA-1644 URL: https://issues.apache.org/jira/browse/KAFKA-1644 Project: Kafka Issue Type: Improvement Reporter: Anton Karamanov Assignee: Anton Karamanov Attachments: 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of RequestOrResponse, which requires handling it as a special case while processing responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067872#comment-14067872 ] Alexey Ozeritskiy commented on KAFKA-1414: -- Anton, your patch is not working for me. I set log.io.parallelism=24 and see only one active disk, after some period of time (1 min) I see two active disks. With my primitive patch (with parArray) kafka uses all 24 disks at once at startup. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917 ] Alexey Ozeritskiy commented on KAFKA-1414: -- You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Yor aproach will not work. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917 ] Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:48 PM: You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Your aproach will not work. was (Author: aozeritsky): You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Yor aproach will not work. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917 ] Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:54 PM: You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each topic-partition. Your aproach will not work. was (Author: aozeritsky): You must create an instance of Runnable for each directory (this.logDirs). You create an instance for each kafka topic-partition. Your aproach will not work. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1551) Configuration example errors
[ https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1551: - Description: A Production Server Config (http://kafka.apache.org/documentation.html#prodconfig) contains error: {code} # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 {code} Should be {code} # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 {code} was: A Production Server Config contains error: {code} # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 {code} Should be {code} # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 {code} Configuration example errors Key: KAFKA-1551 URL: https://issues.apache.org/jira/browse/KAFKA-1551 Project: Kafka Issue Type: Bug Components: website Reporter: Alexey Ozeritskiy A Production Server Config (http://kafka.apache.org/documentation.html#prodconfig) contains error: {code} # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 {code} Should be {code} # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1543) Changing replication factor
Alexey Ozeritskiy created KAFKA-1543: Summary: Changing replication factor Key: KAFKA-1543 URL: https://issues.apache.org/jira/browse/KAFKA-1543 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy Attachments: can-change-replication.patch It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage %% kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output %% -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1543) Changing replication factor
[ https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1543: - Attachment: can-change-replication.patch Changing replication factor --- Key: KAFKA-1543 URL: https://issues.apache.org/jira/browse/KAFKA-1543 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy Attachments: can-change-replication.patch It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage {code} kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1543) Changing replication factor
[ https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1543: - Description: It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage {code} kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output {code} was: It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage %% kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output %% Changing replication factor --- Key: KAFKA-1543 URL: https://issues.apache.org/jira/browse/KAFKA-1543 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy Attachments: can-change-replication.patch It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage {code} kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1543) Changing replication factor
[ https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1543: - Status: Patch Available (was: Open) Changing replication factor --- Key: KAFKA-1543 URL: https://issues.apache.org/jira/browse/KAFKA-1543 Project: Kafka Issue Type: Improvement Reporter: Alexey Ozeritskiy Attachments: can-change-replication.patch It is difficult to change replication factor by manual editing json config. I propose to add a key to kafka-reassign-partitions.sh command to automatically create json config. Example of usage {code} kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate output {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061957#comment-14061957 ] Alexey Ozeritskiy commented on KAFKA-1414: -- Anton, you forget to use new config option (log.recovery.threads). Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1414: - Attachment: parallel-dir-loading-trunk-threadpool.patch Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056358#comment-14056358 ] Alexey Ozeritskiy commented on KAFKA-1414: -- 1. Ok 2. If any thread fails we get ExecutionException at {code} jobs.foreach(_.get()) {code} We can get the original exception by calling e.getCause() and rethrow it. Is this ok ? Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1414: - Attachment: parallel-dir-loading-trunk.patch parallel-dir-loading-0.8.patch Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1414: - Affects Version/s: (was: 0.8.1) 0.9.0 0.8.2 0.8.1.1 Status: Patch Available (was: Open) Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13980971#comment-13980971 ] Alexey Ozeritskiy commented on KAFKA-1414: -- What to do with exceptions? Maybe we should use ParArray here? {code} private def loadLogs(dirs: Seq[File]) { dirs.toParArray.foreach(dir = loadDir(dir)) } private def loadDir(dir: File) { val recoveryPoints = this.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } private def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1322) slow startup after unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1322: - Resolution: Won't Fix Status: Resolved (was: Patch Available) ok slow startup after unclean shutdown --- Key: KAFKA-1322 URL: https://issues.apache.org/jira/browse/KAFKA-1322 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Alexey Ozeritskiy Priority: Critical Attachments: kafka.patch Kafka 0.8.1 checks all segments on unclean shutdown. 0.8.0 checks only the latest segment. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1322) slow startup after unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13944376#comment-13944376 ] Alexey Ozeritskiy commented on KAFKA-1322: -- Can't reproduce now. I had unclean shutdowns after upgrade 0.8-0.8.1 and after running out of disk space. slow startup after unclean shutdown --- Key: KAFKA-1322 URL: https://issues.apache.org/jira/browse/KAFKA-1322 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Alexey Ozeritskiy Priority: Critical Attachments: kafka.patch Kafka 0.8.1 checks all segments on unclean shutdown. 0.8.0 checks only the latest segment. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1322) slow start on unclean shutdown
Alexey Ozeritskiy created KAFKA-1322: Summary: slow start on unclean shutdown Key: KAFKA-1322 URL: https://issues.apache.org/jira/browse/KAFKA-1322 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Alexey Ozeritskiy Priority: Critical Kafka 0.8.1 checks all segments on unclean shutdown. 0.8.0 checks only the latest segment. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1322) slow start on unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1322: - Status: Patch Available (was: Open) slow start on unclean shutdown -- Key: KAFKA-1322 URL: https://issues.apache.org/jira/browse/KAFKA-1322 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Alexey Ozeritskiy Priority: Critical Kafka 0.8.1 checks all segments on unclean shutdown. 0.8.0 checks only the latest segment. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1322) slow start on unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1322: - Attachment: kafka.patch slow start on unclean shutdown -- Key: KAFKA-1322 URL: https://issues.apache.org/jira/browse/KAFKA-1322 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Alexey Ozeritskiy Priority: Critical Attachments: kafka.patch Kafka 0.8.1 checks all segments on unclean shutdown. 0.8.0 checks only the latest segment. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1322) slow startup after unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1322: - Summary: slow startup after unclean shutdown (was: slow start on unclean shutdown) slow startup after unclean shutdown --- Key: KAFKA-1322 URL: https://issues.apache.org/jira/browse/KAFKA-1322 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Alexey Ozeritskiy Priority: Critical Attachments: kafka.patch Kafka 0.8.1 checks all segments on unclean shutdown. 0.8.0 checks only the latest segment. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13692816#comment-13692816 ] Alexey Ozeritskiy commented on KAFKA-937: - This patch works, thanks. ConsumerFetcherThread can deadlock -- Key: KAFKA-937 URL: https://issues.apache.org/jira/browse/KAFKA-937 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka-937_ConsumerOffsetChecker.patch, kafka-937_delta.patch, kafka-937.patch We have the following access pattern that can introduce a deadlock. AbstractFetcherThread.processPartitionsWithError() - ConsumerFetcherThread.processPartitionsWithError() - ConsumerFetcherManager.addPartitionsWithError() wait for lock - LeaderFinderThread holding lock while calling AbstractFetcherManager.shutdownIdleFetcherThreads() - AbstractFetcherManager calling fetcher.shutdown, which needs to wait until AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13692418#comment-13692418 ] Alexey Ozeritskiy commented on KAFKA-937: - https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5 ConsumerFetcherThread can deadlock -- Key: KAFKA-937 URL: https://issues.apache.org/jira/browse/KAFKA-937 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka-937_delta.patch, kafka-937.patch We have the following access pattern that can introduce a deadlock. AbstractFetcherThread.processPartitionsWithError() - ConsumerFetcherThread.processPartitionsWithError() - ConsumerFetcherManager.addPartitionsWithError() wait for lock - LeaderFinderThread holding lock while calling AbstractFetcherManager.shutdownIdleFetcherThreads() - AbstractFetcherManager calling fetcher.shutdown, which needs to wait until AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Comment Edited] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13692418#comment-13692418 ] Alexey Ozeritskiy edited comment on KAFKA-937 at 6/24/13 9:44 PM: -- This patch touches SimpleConsumer. proof: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5 was (Author: aozeritsky): https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5 ConsumerFetcherThread can deadlock -- Key: KAFKA-937 URL: https://issues.apache.org/jira/browse/KAFKA-937 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka-937_delta.patch, kafka-937.patch We have the following access pattern that can introduce a deadlock. AbstractFetcherThread.processPartitionsWithError() - ConsumerFetcherThread.processPartitionsWithError() - ConsumerFetcherManager.addPartitionsWithError() wait for lock - LeaderFinderThread holding lock while calling AbstractFetcherManager.shutdownIdleFetcherThreads() - AbstractFetcherManager calling fetcher.shutdown, which needs to wait until AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira