[jira] [Commented] (KAFKA-1908) Split brain

2017-01-05 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15800988#comment-15800988
 ] 

Alexey Ozeritskiy commented on KAFKA-1908:
--

Now I think this is not a problem. In the above scenario we get isolated 
leader. We cannot write data via that leader (in case of appropriate 
min.insync.replicas), non-local processes cannot read data from that leader, 
local processes (mirror maker and so on) will do nothing because they cannot 
read actual data.

I think the ticket should be closed.

> Split brain
> ---
>
> Key: KAFKA-1908
> URL: https://issues.apache.org/jira/browse/KAFKA-1908
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Alexey Ozeritskiy
>
> In some cases, there may be two leaders for one partition.
> Steps to reproduce:
> # We have 3 brokers, 1 partition with 3 replicas:
> {code}
> TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
> ISR: [1,2,3]
> {code} 
> # controller works on broker 3
> # let the kafka port be 9092. We execute on broker 1:
> {code}
> iptables -A INPUT -p tcp --dport 9092 -j REJECT
> {code}
> # Initiate replica election
> # As a result:
> Broker 1:
> {code}
> TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
> ISR: [1,2,3]
> {code}
> Broker 2:
> {code}
> TopicAndPartition: [partition,0]Leader: 2   Replicas: [2,1,3]   
> ISR: [1,2,3]
> {code}
> # Flush the iptables rules on broker 1
> Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not 
> receive new data. A consumer can read data from replica-1 or replica-2. When 
> it reads from replica-1 it resets the offsets and than can read duplicates 
> from replica-2.
> We saw this situation in our production cluster when it had network problems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1923) Negative offsets in replication-offset-checkpoint file

2016-12-23 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772787#comment-15772787
 ] 

Alexey Ozeritskiy commented on KAFKA-1923:
--

Problem stil exists (v0.10.1.1). Trying to reproduce it. I've read the code and 
found that the only place with negative offsets is Replica.logEndOffset (for 
remote Replica), but I dont understand how it can get into highwatermark.

> Negative offsets in replication-offset-checkpoint file
> --
>
> Key: KAFKA-1923
> URL: https://issues.apache.org/jira/browse/KAFKA-1923
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Oleg Golovin
>
> Today was the second time we witnessed negative offsets in 
> replication-offset-checkpoint file. After restart the node stops replicating 
> some of its partitions.
> Unfortunately we can't reproduce it yet. But the two cases we encountered 
> indicate a bug which should be addressed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4205) NullPointerException in fetchOffsetsBefore

2016-12-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4205:
-
Status: Patch Available  (was: Open)

> NullPointerException in fetchOffsetsBefore
> --
>
> Key: KAFKA-4205
> URL: https://issues.apache.org/jira/browse/KAFKA-4205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andrew Grasso
>Assignee: Anton Karamanov
>  Labels: reliability
> Fix For: 0.10.1.1
>
>
> We recently observed the following error in brokers running 0.9.0.1:
> A client saw an Unkown error code in response to an offset request for 
> TOPICX, partition 0
> The server logs look like:
> {code}
> [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log 
> TOPICX-0 for deletion. (kafka.log.Log)
> [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to 
> offset request (kafka.server.KafkaApis)
> java.lang.NullPointerException
> at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513)
> at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. 
> (kafka.log.Log)
> [2016-09-21 21:27:07,263] INFO Deleting index 
> /path/to/kafka/data/TOPICX-0/000527235760.index.deleted 
> (kafka.log.OffsetIndex)
> {code}
> I suspect a race condition between {{Log.deleteSegment}} (which takes a lock 
> on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any 
> lock. In particular, line 513 in KafkaApis looks like:
> {code:title=KafkaApis.scala|borderStyle=solid}
> 510  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: 
> Int): Seq[Long] = {
> 511val segsArray = log.logSegments.toArray
> 512var offsetTimeArray: Array[(Long, Long)] = null
> 513val lastSegmentHasSize = segsArray.last.size > 0;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4205) NullPointerException in fetchOffsetsBefore

2016-12-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4205:
-
Assignee: Anton Karamanov

> NullPointerException in fetchOffsetsBefore
> --
>
> Key: KAFKA-4205
> URL: https://issues.apache.org/jira/browse/KAFKA-4205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andrew Grasso
>Assignee: Anton Karamanov
>  Labels: reliability
> Fix For: 0.10.1.1
>
>
> We recently observed the following error in brokers running 0.9.0.1:
> A client saw an Unkown error code in response to an offset request for 
> TOPICX, partition 0
> The server logs look like:
> {code}
> [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log 
> TOPICX-0 for deletion. (kafka.log.Log)
> [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to 
> offset request (kafka.server.KafkaApis)
> java.lang.NullPointerException
> at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513)
> at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. 
> (kafka.log.Log)
> [2016-09-21 21:27:07,263] INFO Deleting index 
> /path/to/kafka/data/TOPICX-0/000527235760.index.deleted 
> (kafka.log.OffsetIndex)
> {code}
> I suspect a race condition between {{Log.deleteSegment}} (which takes a lock 
> on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any 
> lock. In particular, line 513 in KafkaApis looks like:
> {code:title=KafkaApis.scala|borderStyle=solid}
> 510  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: 
> Int): Seq[Long] = {
> 511val segsArray = log.logSegments.toArray
> 512var offsetTimeArray: Array[(Long, Long)] = null
> 513val lastSegmentHasSize = segsArray.last.size > 0;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4205) NullPointerException in fetchOffsetsBefore

2016-12-02 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15715179#comment-15715179
 ] 

Alexey Ozeritskiy commented on KAFKA-4205:
--

We see this on 0.10.1. ~20-30 errors per day on every broker

> NullPointerException in fetchOffsetsBefore
> --
>
> Key: KAFKA-4205
> URL: https://issues.apache.org/jira/browse/KAFKA-4205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andrew Grasso
>
> We recently observed the following error in brokers running 0.9.0.1:
> A client saw an Unkown error code in response to an offset request for 
> TOPICX, partition 0
> The server logs look like:
> {code}
> [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log 
> TOPICX-0 for deletion. (kafka.log.Log)
> [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to 
> offset request (kafka.server.KafkaApis)
> java.lang.NullPointerException
> at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513)
> at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. 
> (kafka.log.Log)
> [2016-09-21 21:27:07,263] INFO Deleting index 
> /path/to/kafka/data/TOPICX-0/000527235760.index.deleted 
> (kafka.log.OffsetIndex)
> {code}
> I suspect a race condition between {{Log.deleteSegment}} (which takes a lock 
> on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any 
> lock. In particular, line 513 in KafkaApis looks like:
> {code:title=KafkaApis.scala|borderStyle=solid}
> 510  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: 
> Int): Seq[Long] = {
> 511val segsArray = log.logSegments.toArray
> 512var offsetTimeArray: Array[(Long, Long)] = null
> 513val lastSegmentHasSize = segsArray.last.size > 0;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15658044#comment-15658044
 ] 

Alexey Ozeritskiy commented on KAFKA-4399:
--

As for now I think you can use the following config as a workaround
{code}
# 100 days
offsets.retention.minutes=144000
# 1 hour
offsets.retention.check.interval.ms=360
{code}

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4399:
-
Status: Patch Available  (was: Open)

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15658027#comment-15658027
 ] 

Alexey Ozeritskiy commented on KAFKA-4399:
--

The simplest way to fix it is to move complicated code (i.e. 
.appendMessagesToLeader) outside the lock. See PR

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4399:
-
Attachment: deadlock-stack

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-4399:


 Summary: Deadlock between cleanupGroupMetadata and offset commit
 Key: KAFKA-4399
 URL: https://issues.apache.org/jira/browse/KAFKA-4399
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Alexey Ozeritskiy
Priority: Blocker


We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
patch did not help us and our stacks is different. I think it is other issue.

Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4319) AbstractFetcherManager shutdown speedup

2016-10-19 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4319:
-
Status: Patch Available  (was: Open)

https://github.com/apache/kafka/pull/2023

> AbstractFetcherManager shutdown speedup
> ---
>
> Key: KAFKA-4319
> URL: https://issues.apache.org/jira/browse/KAFKA-4319
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Alexey Ozeritskiy
>
> While a shutdown proccess, AbstractFetcherManager closed all worker-threads 
> sequentially which slows the final time of shutdown dramatically on huge 
> clusters (approximately 15 minutes for 100 nodes, for example).
> This can be improved by parallel mode. On the first stage 
> AbstractFetcherManager can send the stop signal and then join all the workers 
> to the thread



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4319) AbstractFetcherManager shutdown speedup

2016-10-19 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-4319:


 Summary: AbstractFetcherManager shutdown speedup
 Key: KAFKA-4319
 URL: https://issues.apache.org/jira/browse/KAFKA-4319
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy


While a shutdown proccess, AbstractFetcherManager closed all worker-threads 
sequentially which slows the final time of shutdown dramatically on huge 
clusters (approximately 15 minutes for 100 nodes, for example).

This can be improved by parallel mode. On the first stage 
AbstractFetcherManager can send the stop signal and then join all the workers 
to the thread



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-13 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15487885#comment-15487885
 ] 

Alexey Ozeritskiy commented on KAFKA-2063:
--

Thanks Andrey, I agree with you.

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Andrey Neporada
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2063:
-
Assignee: Andrey Neporada  (was: Alexey Ozeritskiy)

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Andrey Neporada
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485357#comment-15485357
 ] 

Alexey Ozeritskiy edited comment on KAFKA-2063 at 9/12/16 9:37 PM:
---

The approah KAFKA-3979 is better for configurations with very big 
max.message.bytes.
For example we have the following config on cluster of 77 hosts
{code}
replica.fetch.max.bytes=13500
replica.socket.receive.buffer.bytes=13500
message.max.bytes=134217728
socket.request.max.bytes=13500
{code}

For KIP-74 it is always needed 77*13500 bytes of memory for process. 
For KAFKA-3979 it is only 77*100 bytes in average.

I think we should to do something with that. 


was (Author: aozeritsky):
The approah KAFKA-3979 is better for configurations with very big 
max.message.size.
For example we have the following config on cluster of 77 hosts
{code}
replica.fetch.max.bytes=13500
replica.socket.receive.buffer.bytes=13500
message.max.bytes=134217728
socket.request.max.bytes=13500
{code}

For KIP-74 it is always needed 77*13500 bytes of memory for process. 
For KAFKA-3979 it is only 77*100 bytes in average.

I think we should to do something with that. 

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Alexey Ozeritskiy
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485357#comment-15485357
 ] 

Alexey Ozeritskiy commented on KAFKA-2063:
--

The approah KAFKA-3979 is better for configurations with very big 
max.message.size.
For example we have the following config on cluster of 77 hosts
{code}
replica.fetch.max.bytes=13500
replica.socket.receive.buffer.bytes=13500
message.max.bytes=134217728
socket.request.max.bytes=13500
{code}

For KIP-74 it is always needed 77*13500 bytes of memory for process. 
For KAFKA-3979 it is only 77*100 bytes in average.

I think we should to do something with that. 

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Alexey Ozeritskiy
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy reassigned KAFKA-2063:


Assignee: Alexey Ozeritskiy  (was: Andrey Neporada)

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Alexey Ozeritskiy
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4071) Corruptted replication-offset-checkpoint leads to kafka server disfunctional

2016-08-22 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431056#comment-15431056
 ] 

Alexey Ozeritskiy commented on KAFKA-4071:
--

Thanks [~zanezhang]. Could you attach replication-offset-checkpoint ? It is 
interesting to see its original binary content.

> Corruptted replication-offset-checkpoint leads to kafka server disfunctional
> 
>
> Key: KAFKA-4071
> URL: https://issues.apache.org/jira/browse/KAFKA-4071
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, offset manager
>Affects Versions: 0.9.0.1
> Environment: Red Hat Enterprise 6.7
>Reporter: Zane Zhang
>Priority: Critical
>
> For an unknown reason, [kafka data root]/replication-offset-checkpoint was 
> corrupted. First Kafka reported an NumberFormatException in kafka sever.out. 
> And then it reported "error when handling request Name: FetchRequest; ... " 
> ERRORs repeatedly (ERROR details below). As a result, clients cannot read 
> from or write to Kafka on several partitions until 
> replication-offset-checkpoint was manually deleted.
> Can Kafka broker handle this error and survive from it?
> And what's the reason this file was corrupted? - Only one file was corrupted 
> and no noticeable disk failure was detected.
> ERROR [KafkaApi-7] error when handling request 
> java.lang.NumberFormatException: For input string: " N?-;   O"
>   at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:77)
>   at java.lang.Integer.parseInt(Integer.java:493)
>   at java.lang.Integer.parseInt(Integer.java:539)
>   at 
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
>   at scala.collection.immutable.StringOps.toInt(StringOps.scala:30)
>   at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:78)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:93)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:173)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:173)
>   at scala.collection.immutable.Set$Set2.foreach(Set.scala:111)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
> ERROR [KafkaApi-7] error when handling request Name: FetchRequest; Version: 
> 1; CorrelationId: 0; ClientId: ReplicaFetcherThread-1-7; ReplicaId: 6; 
> MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [prodTopicDal09E,166] -> 
> PartitionFetchInfo(7123666,20971520),[prodTopicDal09E,118] -> 
> PartitionFetchInfo(7128188,20971520),[prodTopicDal09E,238] -> 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2016-08-18 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4039:
-
Attachment: deadlock-stack2

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
> Attachments: deadlock-stack2
>
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1530) howto update continuously

2016-08-17 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425454#comment-15425454
 ] 

Alexey Ozeritskiy commented on KAFKA-1530:
--

I think this ticket may be closed
unclean.leader.election.enable=false helps us
Also we've developed tool kafka-restarter that restarts kafka node by node and 
controls isr status.
And we've developed tool fix-isr that can fix isr after cluster power failure.

> howto update continuously
> -
>
> Key: KAFKA-1530
> URL: https://issues.apache.org/jira/browse/KAFKA-1530
> Project: Kafka
>  Issue Type: Wish
>Reporter: Stanislav Gilmulin
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: operating_manual, performance
>
> Hi,
>  
> Could I ask you a question about the Kafka update procedure?
> Is there a way to update software, which doesn't require service interruption 
> or lead to data losses?
> We can't stop message brokering during the update as we have a strict SLA.
>  
> Best regards
> Stanislav Gilmulin



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2016-08-17 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424201#comment-15424201
 ] 

Alexey Ozeritskiy commented on KAFKA-4039:
--

Thanks [~maysamyabandeh]
I'll be happy to test your patch.

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2016-08-16 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423298#comment-15423298
 ] 

Alexey Ozeritskiy commented on KAFKA-4039:
--

Thanks [~maysamyabandeh].
I think we have to decide how to solve the problem or revert commit 
d1757c70a198014e85026c01a1a4ccab6a12da7d because it brings more serious 
problem. In fact, my "solution" from KAFKA-3924 does not solve the problem 
because shutdown hook waits for sheduler and vice versa. 

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3979) Optimize memory used by replication process by using adaptive fetch message size

2016-08-14 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy resolved KAFKA-3979.
--
Resolution: Won't Fix

There is a better solution: KAFKA-2063

> Optimize memory used by replication process by using adaptive fetch message 
> size
> 
>
> Key: KAFKA-3979
> URL: https://issues.apache.org/jira/browse/KAFKA-3979
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.10.0.0
>Reporter: Andrey Neporada
>
> Current replication process fetches messages in replica.fetch.max.bytes-sized 
> chunks.
> Since replica.fetch.max.bytes should be bigger than max.message.bytes for 
> replication to work, one can face big memory consumption for replication 
> process, especially for installations with big number of partitions.
> Proposed solution is to try to fetch messages in smaller chunks (say 
> replica.fetch.base.bytes).
> If we encounter message bigger than current fetch chunk, we increase chunk 
> (f.e. twofold) and retry. After replicating this bigger message, we shrunk 
> fetch chunk size back until it reaches replica.fetch.base.bytes
> replica.fetch.base.bytes should be chosen big enough not to affect throughput 
> and to be bigger than most of messages.
> However, it can be much less than replica.fetch.max.bytes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO

2016-08-14 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15420251#comment-15420251
 ] 

Alexey Ozeritskiy commented on KAFKA-3924:
--

IMHO the simplest way to solve the problem is to execute System.exit 
asyncroniously:
{code}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ef602e4..ed00a73 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -175,10 +175,13 @@ class ReplicaFetcherThread(name: String,
   if (!LogConfig.fromProps(brokerConfig.originals, 
AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
 ConfigType.Topic, 
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
 // Log a fatal error and shutdown the broker to ensure that data loss 
does not unexpectedly occur.
-fatal("Exiting because log truncation is not allowed for partition 
%s,".format(topicAndPartition) +
+val msg = "Exiting because log truncation is not allowed for partition 
%s,".format(topicAndPartition) +
   " Current leader %d's latest offset %d is less than replica %d's 
latest offset %d"
-  .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, 
replica.logEndOffset.messageOffset))
-System.exit(1)
+  .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, 
replica.logEndOffset.messageOffset)
+fatal(msg)
+
+replicaMgr.scheduler.schedule("exit", () => System.exit(1))
+throw new Exception(msg)
   }

   warn("Replica %d for partition %s reset its fetch offset from %d to 
current leader %d's latest offset %d"
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2b97783..6e6539b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -105,7 +105,7 @@ class ReplicaManager(val config: KafkaConfig,
  time: Time,
  jTime: JTime,
  val zkUtils: ZkUtils,
- scheduler: Scheduler,
+ val scheduler: Scheduler,
  val logManager: LogManager,
  val isShuttingDown: AtomicBoolean,
  threadNamePrefix: Option[String] = None) extends Logging 
with KafkaMetricsGroup {
{code}

> Data loss due to halting when LEO is larger than leader's LEO
> -
>
> Key: KAFKA-3924
> URL: https://issues.apache.org/jira/browse/KAFKA-3924
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
> Fix For: 0.10.0.1
>
> Attachments: deadlock-stack
>
>
> Currently the follower broker panics when its LEO is larger than its leader's 
> LEO,  and assuming that this is an impossible state to reach, halts the 
> process to prevent any further damage.
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
>   // Prior to truncating the follower's log, ensure that doing so is not 
> disallowed by the configuration for unclean leader election.
>   // This situation could only happen if the unclean election 
> configuration for a topic changes while a replica is down. Otherwise,
>   // we should never encounter this situation since a non-ISR leader 
> cannot be elected if disallowed by the broker configuration.
>   if (!LogConfig.fromProps(brokerConfig.originals, 
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic, 
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss 
> does not unexpectedly occur.
> fatal("...")
> Runtime.getRuntime.halt(1)
>   }
> {code}
> Firstly this assumption is invalid and there are legitimate cases (examples 
> below) that this case could actually occur. Secondly halt results into the 
> broker losing its un-flushed data, and if multiple brokers halt 
> simultaneously there is a chance that both leader and followers of a 
> partition are among the halted brokers, which would result into permanent 
> data loss.
> Given that this is a legit case, we suggest to replace it with a graceful 
> shutdown to avoid propagating data loss to the entire cluster.
> Details:
> One legit case that this could actually occur is when a troubled broker 
> shrinks its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In 
> this case the broker has lost some data but the controller cannot still 
> elects the others as the leader. If the crashed broker comes back up, the 
> controller 

[jira] [Updated] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO

2016-08-13 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3924:
-
Attachment: deadlock-stack

> Data loss due to halting when LEO is larger than leader's LEO
> -
>
> Key: KAFKA-3924
> URL: https://issues.apache.org/jira/browse/KAFKA-3924
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
> Fix For: 0.10.0.1
>
> Attachments: deadlock-stack
>
>
> Currently the follower broker panics when its LEO is larger than its leader's 
> LEO,  and assuming that this is an impossible state to reach, halts the 
> process to prevent any further damage.
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
>   // Prior to truncating the follower's log, ensure that doing so is not 
> disallowed by the configuration for unclean leader election.
>   // This situation could only happen if the unclean election 
> configuration for a topic changes while a replica is down. Otherwise,
>   // we should never encounter this situation since a non-ISR leader 
> cannot be elected if disallowed by the broker configuration.
>   if (!LogConfig.fromProps(brokerConfig.originals, 
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic, 
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss 
> does not unexpectedly occur.
> fatal("...")
> Runtime.getRuntime.halt(1)
>   }
> {code}
> Firstly this assumption is invalid and there are legitimate cases (examples 
> below) that this case could actually occur. Secondly halt results into the 
> broker losing its un-flushed data, and if multiple brokers halt 
> simultaneously there is a chance that both leader and followers of a 
> partition are among the halted brokers, which would result into permanent 
> data loss.
> Given that this is a legit case, we suggest to replace it with a graceful 
> shutdown to avoid propagating data loss to the entire cluster.
> Details:
> One legit case that this could actually occur is when a troubled broker 
> shrinks its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In 
> this case the broker has lost some data but the controller cannot still 
> elects the others as the leader. If the crashed broker comes back up, the 
> controller elects it as the leader, and as a result all other brokers who are 
> now following it halt since they have LEOs larger than that of shrunk topics 
> in the restarted broker.  We actually had a case that bringing up a crashed 
> broker simultaneously took down the entire cluster and as explained above 
> this could result into data loss.
> The other legit case is when multiple brokers ungracefully shutdown at the 
> same time. In this case both of the leader and the followers lose their 
> un-flushed data but one of them has HW larger than the other. Controller 
> elects the one who comes back up sooner as the leader and if its LEO is less 
> than its future follower, the follower will halt (and probably lose more 
> data). Simultaneous ungrateful shutdown could happen due to hardware issue 
> (e.g., rack power failure), operator errors, or software issue (e.g., the 
> case above that is further explained in KAFKA-3410 and KAFKA-3861 and causes 
> simultaneous halts in multiple brokers)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO

2016-08-13 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15419873#comment-15419873
 ] 

Alexey Ozeritskiy commented on KAFKA-3924:
--

I've got the deadlock with that patch. Stack traces:
{code}
"ReplicaFetcherThread-3-2" #112 prio=5 os_prio=0 tid=0x7f0acc10 
nid=0xfd54f in Object.wait() [0x7f0b141d7000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1245)
- locked <0x0003d8269bc8> (a kafka.Kafka$$anon$1)
at java.lang.Thread.join(Thread.java:1319)
at 
java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at 
java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0x0003d8106e88> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Runtime.exit(Runtime.java:109)
at java.lang.System.exit(System.java:971)
at 
kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:179)
{code}

{code}
"Thread-2" #29 prio=5 os_prio=0 tid=0x7f0a70008000 nid=0xfecbf in 
Object.wait() [0x7f0b166e5000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1245)
- locked <0x0003e5c46960> (a java.lang.Thread)
at java.lang.Thread.join(Thread.java:1319)
at 
kafka.server.KafkaRequestHandlerPool$$anonfun$shutdown$3.apply(KafkaRequestHandler.scala:92)
at 
kafka.server.KafkaRequestHandlerPool$$anonfun$shutdown$3.apply(KafkaRequestHandler.scala:91)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
kafka.server.KafkaRequestHandlerPool.shutdown(KafkaRequestHandler.scala:91)
at 
kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:559)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:51)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:51)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:559)
at 
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:49)
at kafka.Kafka$$anon$1.run(Kafka.scala:63)
{code}

System.exit executes hook in Thread 2 and joins it (first trace). Thread 2 
joins ReplicaFetcherThread-3-2 (second trace). So they are waiting each other 
forever.

> Data loss due to halting when LEO is larger than leader's LEO
> -
>
> Key: KAFKA-3924
> URL: https://issues.apache.org/jira/browse/KAFKA-3924
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
> Fix For: 0.10.0.1
>
>
> Currently the follower broker panics when its LEO is larger than its leader's 
> LEO,  and assuming that this is an impossible state to reach, halts the 
> process to prevent any further damage.
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
>   // Prior to truncating the follower's log, ensure that doing so is not 
> disallowed by the configuration for unclean leader election.
>   // This situation could only happen if the unclean election 
> configuration for a topic changes while a replica is down. Otherwise,
>   // we should never encounter this situation since a non-ISR leader 
> cannot be elected if disallowed by the broker configuration.
>   if (!LogConfig.fromProps(brokerConfig.originals, 
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic, 
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss 
> does not unexpectedly occur.
> fatal("...")
> Runtime.getRuntime.halt(1)
>   }
> {code}
> Firstly this assumption is invalid and there are legitimate cases (examples 
> below) that this case could actually occur. Secondly halt results into the 
> broker losing its un-flushed data, and if multiple brokers halt 
> simultaneously there is a chance that both leader and followers of a 
> partition are among the halted brokers, which would result into permanent 
> data loss.
> Given that this is a legit case, we suggest to replace it with a graceful 
> shutdown to avoid propagating data loss to the 

[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3997:
-
Issue Type: Improvement  (was: Bug)

> Halting because log truncation is not allowed and suspicious logging
> 
>
> Key: KAFKA-3997
> URL: https://issues.apache.org/jira/browse/KAFKA-3997
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-3997.patch
>
>
> When follower wants to truncate partition and it is not allowed it prints the 
> following message:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
> leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}
> It is difficult to understand which partition is it.
> I suggest to log here partition instead of topic. For example:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], 
> Current leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3997:
-
Description: 
When follower wants to truncate partition and it is not allowed it prints the 
following message:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}



  was:
When follower wants to truncate partition and it is not allowed it print the 
following message:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}




> Halting because log truncation is not allowed and suspicious logging
> 
>
> Key: KAFKA-3997
> URL: https://issues.apache.org/jira/browse/KAFKA-3997
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-3997.patch
>
>
> When follower wants to truncate partition and it is not allowed it prints the 
> following message:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
> leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}
> It is difficult to understand which partition is it.
> I suggest to log here partition instead of topic. For example:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], 
> Current leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3997:
-
Description: 
When follower wants to truncate partition and it is not allowed it print the 
following message:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}



  was:
When follower wants to truncate partition and it is not allowed it print the 
following message:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}




> Halting because log truncation is not allowed and suspicious logging
> 
>
> Key: KAFKA-3997
> URL: https://issues.apache.org/jira/browse/KAFKA-3997
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-3997.patch
>
>
> When follower wants to truncate partition and it is not allowed it print the 
> following message:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
> leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}
> It is difficult to understand which partition is it.
> I suggest to log here partition instead of topic. For example:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for partition [rt3.fol--yabs-rt--bs-hit-log,0], 
> Current leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3997:
-
Attachment: KAFKA-3997.patch

> Halting because log truncation is not allowed and suspicious logging
> 
>
> Key: KAFKA-3997
> URL: https://issues.apache.org/jira/browse/KAFKA-3997
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-3997.patch
>
>
> When follower wants to truncate partition and it is not allowed it print the 
> following message:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
> leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}
> It is difficult to understand which partition is it.
> I suggest to log here partition instead of topic. For example:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], 
> Current leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3997:
-
Status: Patch Available  (was: Open)

> Halting because log truncation is not allowed and suspicious logging
> 
>
> Key: KAFKA-3997
> URL: https://issues.apache.org/jira/browse/KAFKA-3997
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.9.0.1, 0.9.0.0
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-3997.patch
>
>
> When follower wants to truncate partition and it is not allowed it print the 
> following message:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
> leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}
> It is difficult to understand which partition is it.
> I suggest to log here partition instead of topic. For example:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], 
> Current leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3997:
-
Description: 
When follower wants to truncate partition and it is not allowed it print the 
following message:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}



  was:
When follower wants to truncate partition and it is not allowed it print the 
following message:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}




> Halting because log truncation is not allowed and suspicious logging
> 
>
> Key: KAFKA-3997
> URL: https://issues.apache.org/jira/browse/KAFKA-3997
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Alexey Ozeritskiy
>
> When follower wants to truncate partition and it is not allowed it print the 
> following message:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
> leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}
> It is difficult to understand which partition is it.
> I suggest to log here partition instead of topic. For example:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log,0], 
> Current leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-3997:
-
Description: 
When follower wants to truncate partition and it is not allowed it print the 
following message:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{noformat}
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
{noformat}



  was:
When follower wants to truncate partition and it is not allowed it print the 
following message:
{{
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
}}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{{
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
}}




> Halting because log truncation is not allowed and suspicious logging
> 
>
> Key: KAFKA-3997
> URL: https://issues.apache.org/jira/browse/KAFKA-3997
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Alexey Ozeritskiy
>
> When follower wants to truncate partition and it is not allowed it print the 
> following message:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
> leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}
> It is difficult to understand which partition is it.
> I suggest to log here partition instead of topic. For example:
> {noformat}
> [2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
> log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], 
> Current leader 19's latest offset 50260815 is less
>  than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3997) Halting because log truncation is not allowed and suspicious logging

2016-07-27 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-3997:


 Summary: Halting because log truncation is not allowed and 
suspicious logging
 Key: KAFKA-3997
 URL: https://issues.apache.org/jira/browse/KAFKA-3997
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.0, 0.9.0.1, 0.9.0.0
Reporter: Alexey Ozeritskiy


When follower wants to truncate partition and it is not allowed it print the 
following message:
{{
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic rt3.fol--yabs-rt--bs-hit-log, Current 
leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
}}

It is difficult to understand which partition is it.
I suggest to log here partition instead of topic. For example:
{{
[2016-07-27 14:07:37,617] FATAL [ReplicaFetcherThread-0-19], Halting because 
log truncation is not allowed for topic [rt3.fol--yabs-rt--bs-hit-log, 0], 
Current leader 19's latest offset 50260815 is less
 than replica 2's latest offset 50260816 (kafka.server.ReplicaFetcherThread)
}}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-07-17 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15381452#comment-15381452
 ] 

Alexey Ozeritskiy commented on KAFKA-3693:
--

It seems this issue is related to KAFKA-2178

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1543) Changing replication factor

2015-08-22 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708037#comment-14708037
 ] 

Alexey Ozeritskiy commented on KAFKA-1543:
--

I think refactoring should be a separate patch and may be a separate jira-issue.

 Changing replication factor
 ---

 Key: KAFKA-1543
 URL: https://issues.apache.org/jira/browse/KAFKA-1543
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy
Assignee: Alexander Pakulov
 Attachments: can-change-replication.patch


 It is difficult to change replication factor by manual editing json config.
 I propose to add a key to kafka-reassign-partitions.sh command to 
 automatically create json config.
 Example of usage
 {code}
 kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
 --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  
 output
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1543) Changing replication factor

2015-08-22 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708039#comment-14708039
 ] 

Alexey Ozeritskiy commented on KAFKA-1543:
--

A you sure that your aproach will work w/o cluster restart? 
AddPartitionsListener only handles new partitions but not new replicas.

 Changing replication factor
 ---

 Key: KAFKA-1543
 URL: https://issues.apache.org/jira/browse/KAFKA-1543
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy
Assignee: Alexander Pakulov
 Attachments: can-change-replication.patch


 It is difficult to change replication factor by manual editing json config.
 I propose to add a key to kafka-reassign-partitions.sh command to 
 automatically create json config.
 Example of usage
 {code}
 kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
 --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  
 output
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2178) Loss of highwatermarks on incorrect cluster shutdown/restart

2015-05-07 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2178:
-
Status: Patch Available  (was: Open)

 Loss of highwatermarks on incorrect cluster shutdown/restart
 

 Key: KAFKA-2178
 URL: https://issues.apache.org/jira/browse/KAFKA-2178
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2178.patch


 ReplicaManager flushes highwatermarks only for partitions which it recieved 
 from Controller.
 If Controller sends incomplete list of partitions then ReplicaManager will 
 write incomplete list of highwatermarks.
 As a result one can lose a lot of data during incorrect broker restart.
 We got this situation in real life on our cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2178) Loss of highwatermarks on incorrect cluster shutdown/restart

2015-05-07 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2178:
-
Attachment: KAFKA-2178.patch

 Loss of highwatermarks on incorrect cluster shutdown/restart
 

 Key: KAFKA-2178
 URL: https://issues.apache.org/jira/browse/KAFKA-2178
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2178.patch


 ReplicaManager flushes highwatermarks only for partitions which it recieved 
 from Controller.
 If Controller sends incomplete list of partitions then ReplicaManager will 
 write incomplete list of highwatermarks.
 As a result one can lose a lot of data during incorrect broker restart.
 We got this situation in real life on our cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2178) Loss of highwatermarks on incorrect cluster shutdown/restart

2015-05-07 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-2178:


 Summary: Loss of highwatermarks on incorrect cluster 
shutdown/restart
 Key: KAFKA-2178
 URL: https://issues.apache.org/jira/browse/KAFKA-2178
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2178.patch

ReplicaManager flushes highwatermarks only for partitions which it recieved 
from Controller.
If Controller sends incomplete list of partitions then ReplicaManager will 
write incomplete list of highwatermarks.
As a result one can lose a lot of data during incorrect broker restart.

We got this situation in real life on our cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization

2015-05-06 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2174:
-
Attachment: KAFKA-2174.patch

 Wrong TopicMetadata deserialization
 ---

 Key: KAFKA-2174
 URL: https://issues.apache.org/jira/browse/KAFKA-2174
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2174.patch


 TopicMetadata.readFrom assumes that ByteBuffer always contains the full set 
 of partitions but it is not true. On incomplete metadata we will get 
 java.lang.ArrayIndexOutOfBoundsException:
 {code}
 java.lang.ArrayIndexOutOfBoundsException: 47
 at 
 kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.Range.foreach(Range.scala:141)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
 {code}
 We sometimes get this exceptions on any broker restart (kill -TERM, 
 controlled.shutdown.enable=false).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization

2015-05-06 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2174:
-
Status: Patch Available  (was: Open)

 Wrong TopicMetadata deserialization
 ---

 Key: KAFKA-2174
 URL: https://issues.apache.org/jira/browse/KAFKA-2174
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2174.patch


 TopicMetadata.readFrom assumes that ByteBuffer always contains the full set 
 of partitions but it is not true. On incomplete metadata we will get 
 java.lang.ArrayIndexOutOfBoundsException:
 {code}
 java.lang.ArrayIndexOutOfBoundsException: 47
 at 
 kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.Range.foreach(Range.scala:141)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
 {code}
 We sometimes get this exceptions on any broker restart (kill -TERM, 
 controlled.shutdown.enable=false).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2174) Wrong TopicMetadata deserialization

2015-05-06 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-2174:


 Summary: Wrong TopicMetadata deserialization
 Key: KAFKA-2174
 URL: https://issues.apache.org/jira/browse/KAFKA-2174
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy


TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of 
partitions but it is not true. On incomplete metadata we will get 
java.lang.ArrayIndexOutOfBoundsException:
{code}
java.lang.ArrayIndexOutOfBoundsException: 47
at 
kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
at 
kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
at 
kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
{code}
We sometimes get this exceptions on any broker restart (kill -TERM, 
controlled.shutdown.enable=false).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2164:
-
Status: Patch Available  (was: Open)

 ReplicaFetcherThread: suspicious log message on reset offset
 

 Key: KAFKA-2164
 URL: https://issues.apache.org/jira/browse/KAFKA-2164
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2164.patch


 If log.logEndOffset  leaderStartOffset the follower resets its offset and 
 prints the following:
 {code}
 [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
 partition [topic,11] reset its fetch offset from 49322124 to current leader 
 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [topic,11] out of range; reset offset to 49322124 
 (kafka.server.ReplicaFetcherThread)
 {code}
 I think the right message should be:
 {code}
 [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
 partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 
 54369274 to current leader 21's start offset 49322124 
 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset 
 offset to 49322124 (kafka.server.ReplicaFetcherThread)
 {code}
 This occurs because ReplicaFetcherThread resets the offset and then print log 
 message.
 Posible solution:
 {code}
 diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 b/core/
 index b31b432..181cbc1 100644
 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
 +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
 @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
 * Roll out a new log at the follower with the start offset equal to 
 the
 */
val leaderStartOffset = 
 simpleConsumer.earliestOrLatestOffset(topicAndPar
 -  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
 leaderSt
warn(Replica %d for partition %s reset its fetch offset from %d to 
 curre
  .format(brokerConfig.brokerId, topicAndPartition, 
 replica.logEndOffset.
 +  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
 leaderSt
leaderStartOffset
  }
}
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2164:
-
Attachment: KAFKA-2164.patch

 ReplicaFetcherThread: suspicious log message on reset offset
 

 Key: KAFKA-2164
 URL: https://issues.apache.org/jira/browse/KAFKA-2164
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2164.patch


 If log.logEndOffset  leaderStartOffset the follower resets its offset and 
 prints the following:
 {code}
 [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
 partition [topic,11] reset its fetch offset from 49322124 to current leader 
 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [topic,11] out of range; reset offset to 49322124 
 (kafka.server.ReplicaFetcherThread)
 {code}
 I think the right message should be:
 {code}
 [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
 partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 
 54369274 to current leader 21's start offset 49322124 
 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset 
 offset to 49322124 (kafka.server.ReplicaFetcherThread)
 {code}
 This occurs because ReplicaFetcherThread resets the offset and then print log 
 message.
 Posible solution:
 {code}
 diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 b/core/
 index b31b432..181cbc1 100644
 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
 +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
 @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
 * Roll out a new log at the follower with the start offset equal to 
 the
 */
val leaderStartOffset = 
 simpleConsumer.earliestOrLatestOffset(topicAndPar
 -  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
 leaderSt
warn(Replica %d for partition %s reset its fetch offset from %d to 
 curre
  .format(brokerConfig.brokerId, topicAndPartition, 
 replica.logEndOffset.
 +  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
 leaderSt
leaderStartOffset
  }
}
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2165:
-
Status: Patch Available  (was: Open)

 ReplicaFetcherThread: data loss on unknown exception
 

 Key: KAFKA-2165
 URL: https://issues.apache.org/jira/browse/KAFKA-2165
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2165.patch


 Sometimes in our cluster some replica gets out of the isr. Then broker 
 redownloads the partition from the beginning. We got the following messages 
 in logs:
 {code}
 # The leader:
 [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when 
 processing fetch request for partition [topic,11] offset 54369274 from 
 follower with correlation id 2634499. Possible cause: Request for offset 
 54369274 but we only have log segments in the range 49322124 to 54369273. 
 (kafka.server.ReplicaManager)
 {code}
 {code}
 # The follower:
 [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for 
 partition [topic,11] reset its fetch offset from 49322124 to current leader 
 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [topic,11] out of range; reset offset to 49322124 
 (kafka.server.ReplicaFetcherThread)
 {code}
 This occures because we update fetchOffset 
 [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124]
  and then try to process message. 
 If any exception except OffsetOutOfRangeCode occures we get unsynchronized 
 fetchOffset and replica.logEndOffset.
 On next fetch iteration we can get 
 fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2165:
-
Attachment: KAFKA-2165.patch

 ReplicaFetcherThread: data loss on unknown exception
 

 Key: KAFKA-2165
 URL: https://issues.apache.org/jira/browse/KAFKA-2165
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2165.patch


 Sometimes in our cluster some replica gets out of the isr. Then broker 
 redownloads the partition from the beginning. We got the following messages 
 in logs:
 {code}
 # The leader:
 [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when 
 processing fetch request for partition [topic,11] offset 54369274 from 
 follower with correlation id 2634499. Possible cause: Request for offset 
 54369274 but we only have log segments in the range 49322124 to 54369273. 
 (kafka.server.ReplicaManager)
 {code}
 {code}
 # The follower:
 [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for 
 partition [topic,11] reset its fetch offset from 49322124 to current leader 
 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [topic,11] out of range; reset offset to 49322124 
 (kafka.server.ReplicaFetcherThread)
 {code}
 This occures because we update fetchOffset 
 [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124]
  and then try to process message. 
 If any exception except OffsetOutOfRangeCode occures we get unsynchronized 
 fetchOffset and replica.logEndOffset.
 On next fetch iteration we can get 
 fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception

2015-05-02 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-2165:


 Summary: ReplicaFetcherThread: data loss on unknown exception
 Key: KAFKA-2165
 URL: https://issues.apache.org/jira/browse/KAFKA-2165
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy


Sometimes in our cluster some replica gets out of the isr. Then broker 
redownloads the partition from the beginning. We got the following messages in 
logs:
{code}
# The leader:
[2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when 
processing fetch request for partition [topic,11] offset 54369274 from follower 
with correlation id 2634499. Possible cause: Request for offset 54369274 but we 
only have log segments in the range 49322124 to 54369273. 
(kafka.server.ReplicaManager)
{code}

{code}
# The follower:
[2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for 
partition [topic,11] reset its fetch offset from 49322124 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}

This occures because we update fetchOffset 
[here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124]
 and then try to process message. 
If any exception except OffsetOutOfRangeCode occures we get unsynchronized 
fetchOffset and replica.logEndOffset.
On next fetch iteration we can get 
fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2164:
-
Description: 
If log.logEndOffset  leaderStartOffset the follower resets its offset and 
prints the following:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [topic,11] reset its fetch offset from 49322124 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}
I think the right message should be:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [topic,11] reset its fetch offset from 54369274 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}

This occurs because ReplicaFetcherThread resets the offset and then print log 
message.
Posible solution:
{code}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/
index b31b432..181cbc1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
* Roll out a new log at the follower with the start offset equal to the
*/
   val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar
-  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   warn(Replica %d for partition %s reset its fetch offset from %d to curre
 .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.
+  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   leaderStartOffset
 }
   }
{code}



  was:
If log.logEndOffset  leaderStartOffset the follower resets its offset and 
prints the following:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [topic,11] reset its fetch offset from 49322124 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}
I think the right message should be:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 
to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset 
offset to 49322124 (kafka.server.ReplicaFetcherThread)
{code}

This occurs because ReplicaFetcherThread resets the offset and then print log 
message.
Posible solution:
{code}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/
index b31b432..181cbc1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
* Roll out a new log at the follower with the start offset equal to the
*/
   val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar
-  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   warn(Replica %d for partition %s reset its fetch offset from %d to curre
 .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.
+  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   leaderStartOffset
 }
   }
{code}




 ReplicaFetcherThread: suspicious log message on reset offset
 

 Key: KAFKA-2164
 URL: https://issues.apache.org/jira/browse/KAFKA-2164
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2164.patch


 If log.logEndOffset  leaderStartOffset the follower resets its offset and 
 prints the following:
 {code}
 [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
 partition [topic,11] reset its fetch offset from 49322124 to current leader 
 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [topic,11] out of range; reset offset to 

[jira] [Created] (KAFKA-2133) Deadlock in DeleteTopicsThread

2015-04-19 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-2133:


 Summary: Deadlock in DeleteTopicsThread
 Key: KAFKA-2133
 URL: https://issues.apache.org/jira/browse/KAFKA-2133
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
Priority: Critical


Controller hangs after deleting multiple topics.

jstack:
1. delete-topics-thread acquired controllerLock and waiting for blocking queue:
{code}
delete-topics-thread-2 prio=10 tid=0x7f3a8d4e4000 nid=0x6924 waiting on 
condition [0x7f3507684000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  0x00047196e738 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
at 
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
- locked 0x00045eab3078 (a java.lang.Object)
at 
kafka.controller.KafkaController.sendRequest(KafkaController.scala:668)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:299)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:291)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:291)
at 
kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:976)
at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:303)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396)
at 
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at 
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
{code}

2. Controller-2-to-broker waiting for controllerLock and cannot take messages 
from blocking queue:
{code}
Controller-2-to-broker-3-send-thread prio=10 tid=0x7f3a8d4a3000 
nid=0x64d1 waiting on condition [0x7f3507786000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  0x000468babde8 (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at kafka.utils.Utils$.inLock(Utils.scala:533)
at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:371)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338)
at 

[jira] [Commented] (KAFKA-2133) Deadlock in DeleteTopicsThread

2015-04-19 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502028#comment-14502028
 ] 

Alexey Ozeritskiy commented on KAFKA-2133:
--

We've set it to 128. I think that default Int.MaxValue is dangerous due to 
posible memory issues.

 Deadlock in DeleteTopicsThread
 --

 Key: KAFKA-2133
 URL: https://issues.apache.org/jira/browse/KAFKA-2133
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
Priority: Critical

 Controller hangs after deleting multiple topics.
 jstack:
 1. delete-topics-thread acquired controllerLock and waiting for blocking 
 queue:
 {code}
 delete-topics-thread-2 prio=10 tid=0x7f3a8d4e4000 nid=0x6924 waiting on 
 condition [0x7f3507684000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x00047196e738 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at 
 kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
 - locked 0x00045eab3078 (a java.lang.Object)
 at 
 kafka.controller.KafkaController.sendRequest(KafkaController.scala:668)
 at 
 kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:299)
 at 
 kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:291)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:291)
 at 
 kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:976)
 at 
 kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:303)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396)
 at 
 scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
 at 
 scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 {code}
 2. Controller-2-to-broker waiting for controllerLock and cannot take messages 
 from blocking queue:
 {code}
 Controller-2-to-broker-3-send-thread prio=10 tid=0x7f3a8d4a3000 
 nid=0x64d1 waiting on condition [0x7f3507786000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x000468babde8 (a 
 java.util.concurrent.locks.ReentrantLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
 at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
 at 
 java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
 at kafka.utils.Utils$.inLock(Utils.scala:533)
 at 
 

[jira] [Resolved] (KAFKA-2133) Deadlock in DeleteTopicsThread

2015-04-19 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy resolved KAFKA-2133.
--
Resolution: Fixed

 Deadlock in DeleteTopicsThread
 --

 Key: KAFKA-2133
 URL: https://issues.apache.org/jira/browse/KAFKA-2133
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
Priority: Critical

 Controller hangs after deleting multiple topics.
 jstack:
 1. delete-topics-thread acquired controllerLock and waiting for blocking 
 queue:
 {code}
 delete-topics-thread-2 prio=10 tid=0x7f3a8d4e4000 nid=0x6924 waiting on 
 condition [0x7f3507684000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x00047196e738 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at 
 kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
 - locked 0x00045eab3078 (a java.lang.Object)
 at 
 kafka.controller.KafkaController.sendRequest(KafkaController.scala:668)
 at 
 kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:299)
 at 
 kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:291)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:291)
 at 
 kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:976)
 at 
 kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:303)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396)
 at 
 scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
 at 
 scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 {code}
 2. Controller-2-to-broker waiting for controllerLock and cannot take messages 
 from blocking queue:
 {code}
 Controller-2-to-broker-3-send-thread prio=10 tid=0x7f3a8d4a3000 
 nid=0x64d1 waiting on condition [0x7f3507786000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x000468babde8 (a 
 java.util.concurrent.locks.ReentrantLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
 at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
 at 
 java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
 at kafka.utils.Utils$.inLock(Utils.scala:533)
 at 
 kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:371)
  

[jira] [Commented] (KAFKA-1908) Split brain

2015-03-04 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14347063#comment-14347063
 ] 

Alexey Ozeritskiy commented on KAFKA-1908:
--

Hi all, the following is our scenario.

We use a custom consumer which works on broker hosts and always consumes leader 
partitions from localhost. Consumer reads data and pushs it to 3rdparty system.
We send a metadata request to localhost and don't use the zk data.
We use zk locks to guarantee that we read a single partition in one process. 
Sometimes we release the locks and consumer can begin to consume the data from 
broken host and reset offsets.

 Split brain
 ---

 Key: KAFKA-1908
 URL: https://issues.apache.org/jira/browse/KAFKA-1908
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2.0
Reporter: Alexey Ozeritskiy

 In some cases, there may be two leaders for one partition.
 Steps to reproduce:
 # We have 3 brokers, 1 partition with 3 replicas:
 {code}
 TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code} 
 # controller works on broker 3
 # let the kafka port be 9092. We execute on broker 1:
 {code}
 iptables -A INPUT -p tcp --dport 9092 -j REJECT
 {code}
 # Initiate replica election
 # As a result:
 Broker 1:
 {code}
 TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code}
 Broker 2:
 {code}
 TopicAndPartition: [partition,0]Leader: 2   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code}
 # Flush the iptables rules on broker 1
 Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not 
 receive new data. A consumer can read data from replica-1 or replica-2. When 
 it reads from replica-1 it resets the offsets and than can read duplicates 
 from replica-2.
 We saw this situation in our production cluster when it had network problems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1908) Split brain

2015-01-29 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-1908:


 Summary: Split brain
 Key: KAFKA-1908
 URL: https://issues.apache.org/jira/browse/KAFKA-1908
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Alexey Ozeritskiy


In some cases, there may be two leaders for one partition.
Steps to reproduce:
# We have 3 brokers, 1 partition with 3 replicas:
{code}
TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
ISR: [1,2,3]
{code} 
# controller works on broker 3
# let the kafka port be 9092. We execute on broker 1:
{code}
iptables -A INPUT -p tcp --dport 9092 -j REJECT
{code}
# Initiate replica election
# As a result:
Broker 1:
{code}
TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
ISR: [1,2,3]
{code}
Broker 2:
{code}
TopicAndPartition: [partition,0]Leader: 2   Replicas: [2,1,3]   
ISR: [1,2,3]
{code}
# Flush the iptables rules on broker 1

Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not 
receive new data. A consumer can read data from replica-1 or replica-2. When it 
reads from replica-1 it resets the offsets and than can read duplicates from 
replica-2.

We saw this situation in our production cluster when it had network problems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1804) Kafka network thread lacks top exception handler

2015-01-22 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287713#comment-14287713
 ] 

Alexey Ozeritskiy commented on KAFKA-1804:
--

The last time we saw the bug during restart the network switch on a cluster of 
20 machines. kafka-network-threads fell down on more than half machines. As a 
result, the cluster became unavailable. We are trying to find the specific 
steps that reproduce the problem.


 Kafka network thread lacks top exception handler
 

 Key: KAFKA-1804
 URL: https://issues.apache.org/jira/browse/KAFKA-1804
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Oleg Golovin
Priority: Critical

 We have faced the problem that some kafka network threads may fail, so that 
 jstack attached to Kafka process showed fewer threads than we had defined in 
 our Kafka configuration. This leads to API requests processed by this thread 
 getting stuck unresponed.
 There were no error messages in the log regarding thread failure.
 We have examined Kafka code to find out there is no top try-catch block in 
 the network thread code, which could at least log possible errors.
 Could you add top-level try-catch block for the network thread, which should 
 recover network thread in case of exception?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1804) Kafka network thread lacks top exception handler

2015-01-21 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1804:
-
Priority: Critical  (was: Major)

 Kafka network thread lacks top exception handler
 

 Key: KAFKA-1804
 URL: https://issues.apache.org/jira/browse/KAFKA-1804
 Project: Kafka
  Issue Type: Bug
Reporter: Oleg Golovin
Priority: Critical

 We have faced the problem that some kafka network threads may fail, so that 
 jstack attached to Kafka process showed fewer threads than we had defined in 
 our Kafka configuration. This leads to API requests processed by this thread 
 getting stuck unresponed.
 There were no error messages in the log regarding thread failure.
 We have examined Kafka code to find out there is no top try-catch block in 
 the network thread code, which could at least log possible errors.
 Could you add top-level try-catch block for the network thread, which should 
 recover network thread in case of exception?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1804) Kafka network thread lacks top exception handler

2015-01-15 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14279396#comment-14279396
 ] 

Alexey Ozeritskiy edited comment on KAFKA-1804 at 1/16/15 7:44 AM:
---

We've written the simple patch for kafka-network-thread:
{code:java}
  override def run(): Unit = {
try {
  iteration() // = the original run()
} catch {
  case e: Throwable = 
error(ERROR IN NETWORK THREAD: %s.format(e), e)
Runtime.getRuntime.halt(1)
}
  }
{code}
and got the following trace:
{code}
[2015-01-15 23:04:08,537] ERROR ERROR IN NETWORK THREAD: 
java.util.NoSuchElementException: None.get (kafka.network.Processor)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:544)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:394)
at kafka.network.Processor.processNewResponses(SocketServer.scala:426)
at kafka.network.Processor.iteration(SocketServer.scala:328)
at kafka.network.Processor.run(SocketServer.scala:381)
at java.lang.Thread.run(Thread.java:745)
{code}


was (Author: aozeritsky):
We've written the simple patch for kafka-network-thread:
{code:java}
  override def run(): Unit = {
try {
  original_run()
} catch {
  case e: Throwable = 
error(ERROR IN NETWORK THREAD: %s.format(e), e)
Runtime.getRuntime.halt(1)
}
  }
{code}
and got the following trace:
{code}
[2015-01-15 23:04:08,537] ERROR ERROR IN NETWORK THREAD: 
java.util.NoSuchElementException: None.get (kafka.network.Processor)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:544)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:394)
at kafka.network.Processor.processNewResponses(SocketServer.scala:426)
at kafka.network.Processor.iteration(SocketServer.scala:328)
at kafka.network.Processor.run(SocketServer.scala:381)
at java.lang.Thread.run(Thread.java:745)
{code}

 Kafka network thread lacks top exception handler
 

 Key: KAFKA-1804
 URL: https://issues.apache.org/jira/browse/KAFKA-1804
 Project: Kafka
  Issue Type: Bug
Reporter: Oleg Golovin

 We have faced the problem that some kafka network threads may fail, so that 
 jstack attached to Kafka process showed fewer threads than we had defined in 
 our Kafka configuration. This leads to API requests processed by this thread 
 getting stuck unresponed.
 There were no error messages in the log regarding thread failure.
 We have examined Kafka code to find out there is no top try-catch block in 
 the network thread code, which could at least log possible errors.
 Could you add top-level try-catch block for the network thread, which should 
 recover network thread in case of exception?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse

2014-10-03 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14158114#comment-14158114
 ] 

Alexey Ozeritskiy commented on KAFKA-1644:
--

This patch simplifies writing clients. Now we have to use the following ugly 
code:
{code:java}
if (id == ResponseKeys.FetchKey) {
  val response = FetchResponse.readFrom(contentBuffer)
  listener ! FetchAnswer(response)
} else {
  val response = ResponseKeys.deserializerForKey(id)(contentBuffer)
  listener ! Answer(response)
}
{code}

 Inherit FetchResponse from RequestOrResponse
 

 Key: KAFKA-1644
 URL: https://issues.apache.org/jira/browse/KAFKA-1644
 Project: Kafka
  Issue Type: Bug
Reporter: Anton Karamanov
Assignee: Anton Karamanov
 Attachments: 
 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch


 Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of 
 RequestOrResponse, which requires handling it as a special case while 
 processing responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse

2014-10-03 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1644:
-
Issue Type: Improvement  (was: Bug)

 Inherit FetchResponse from RequestOrResponse
 

 Key: KAFKA-1644
 URL: https://issues.apache.org/jira/browse/KAFKA-1644
 Project: Kafka
  Issue Type: Improvement
Reporter: Anton Karamanov
Assignee: Anton Karamanov
 Attachments: 
 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch


 Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of 
 RequestOrResponse, which requires handling it as a special case while 
 processing responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067872#comment-14067872
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

Anton, your patch is not working for me. I set log.io.parallelism=24 and see 
only one active disk, after some period of time (1 min) I see two active disks.

With my primitive patch (with parArray) kafka uses all 24 disks at once at 
startup.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Yor aproach will not work.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917
 ] 

Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:48 PM:


You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Your aproach will not work.


was (Author: aozeritsky):
You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Yor aproach will not work.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-20 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14067917#comment-14067917
 ] 

Alexey Ozeritskiy edited comment on KAFKA-1414 at 7/20/14 12:54 PM:


You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each topic-partition. Your aproach will not work.


was (Author: aozeritsky):
You must create an instance of Runnable for each directory (this.logDirs). You 
create an instance for each kafka topic-partition. Your aproach will not work.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1551) Configuration example errors

2014-07-19 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1551:
-

Description: 
A Production Server Config 
(http://kafka.apache.org/documentation.html#prodconfig) contains error:
{code}
# ZK configuration
zk.connection.timeout.ms=6000
zk.sync.time.ms=2000
{code}
Should be
{code}
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
{code}


  was:
A Production Server Config contains error:
{code}
# ZK configuration
zk.connection.timeout.ms=6000
zk.sync.time.ms=2000
{code}
Should be
{code}
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
{code}



 Configuration example errors
 

 Key: KAFKA-1551
 URL: https://issues.apache.org/jira/browse/KAFKA-1551
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Alexey Ozeritskiy

 A Production Server Config 
 (http://kafka.apache.org/documentation.html#prodconfig) contains error:
 {code}
 # ZK configuration
 zk.connection.timeout.ms=6000
 zk.sync.time.ms=2000
 {code}
 Should be
 {code}
 # ZK configuration
 zookeeper.connection.timeout.ms=6000
 zookeeper.sync.time.ms=2000
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1543) Changing replication factor

2014-07-17 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-1543:


 Summary: Changing replication factor
 Key: KAFKA-1543
 URL: https://issues.apache.org/jira/browse/KAFKA-1543
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy
 Attachments: can-change-replication.patch

It is difficult to change replication factor by manual editing json config.
I propose to add a key to kafka-reassign-partitions.sh command to automatically 
create json config.

Example of usage
%%
kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
--topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  output
%%



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1543) Changing replication factor

2014-07-17 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1543:
-

Attachment: can-change-replication.patch

 Changing replication factor
 ---

 Key: KAFKA-1543
 URL: https://issues.apache.org/jira/browse/KAFKA-1543
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy
 Attachments: can-change-replication.patch


 It is difficult to change replication factor by manual editing json config.
 I propose to add a key to kafka-reassign-partitions.sh command to 
 automatically create json config.
 Example of usage
 {code}
 kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
 --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  
 output
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1543) Changing replication factor

2014-07-17 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1543:
-

Description: 
It is difficult to change replication factor by manual editing json config.
I propose to add a key to kafka-reassign-partitions.sh command to automatically 
create json config.

Example of usage
{code}
kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
--topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  output
{code}

  was:
It is difficult to change replication factor by manual editing json config.
I propose to add a key to kafka-reassign-partitions.sh command to automatically 
create json config.

Example of usage
%%
kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
--topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  output
%%


 Changing replication factor
 ---

 Key: KAFKA-1543
 URL: https://issues.apache.org/jira/browse/KAFKA-1543
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy
 Attachments: can-change-replication.patch


 It is difficult to change replication factor by manual editing json config.
 I propose to add a key to kafka-reassign-partitions.sh command to 
 automatically create json config.
 Example of usage
 {code}
 kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
 --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  
 output
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1543) Changing replication factor

2014-07-17 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1543:
-

Status: Patch Available  (was: Open)

 Changing replication factor
 ---

 Key: KAFKA-1543
 URL: https://issues.apache.org/jira/browse/KAFKA-1543
 Project: Kafka
  Issue Type: Improvement
Reporter: Alexey Ozeritskiy
 Attachments: can-change-replication.patch


 It is difficult to change replication factor by manual editing json config.
 I propose to add a key to kafka-reassign-partitions.sh command to 
 automatically create json config.
 Example of usage
 {code}
 kafka-reassign-partitions.sh --zookeeper zk --replicas new-replication-factor 
 --topics-to-move-json-file topics-file --broker-list 1,2,3,4 --generate  
 output
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-15 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061957#comment-14061957
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

Anton, you forget to use new config option (log.recovery.threads).

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-09 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1414:
-

Attachment: parallel-dir-loading-trunk-threadpool.patch

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-09 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056358#comment-14056358
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

1. Ok
2. If any thread fails we get ExecutionException at
{code}
jobs.foreach(_.get())
{code}
We can get the original exception by calling e.getCause() and rethrow it. Is 
this ok ?


 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-06 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1414:
-

Attachment: parallel-dir-loading-trunk.patch
parallel-dir-loading-0.8.patch

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-06 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1414:
-

Affects Version/s: (was: 0.8.1)
   0.9.0
   0.8.2
   0.8.1.1
   Status: Patch Available  (was: Open)

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-04-25 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13980971#comment-13980971
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

What to do with exceptions? Maybe we should use ParArray here?

{code}
  private def loadLogs(dirs: Seq[File]) {
dirs.toParArray.foreach(dir = loadDir(dir))
  }

  private def loadDir(dir: File) {
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
/* load the logs */
val subDirs = dir.listFiles()
if(subDirs != null) {
  val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
  if(cleanShutDownFile.exists())
info(Found clean shutdown file. Skipping recovery for all logs in data 
directory '%s'.format(dir.getAbsolutePath))
  for(dir - subDirs) {
if(dir.isDirectory) {
  info(Loading log ' + dir.getName + ')
  val topicPartition = Log.parseTopicPartitionName(dir.getName)
  val config = topicConfigs.getOrElse(topicPartition.topic, 
defaultConfig)
  val log = new Log(dir,
config,
recoveryPoints.getOrElse(topicPartition, 0L),
scheduler,
time)
  val previous = addLogWithLock(topicPartition, log)
  if(previous != null)
throw new IllegalArgumentException(Duplicate log directories 
found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
  }
  cleanShutDownFile.delete()
}
  }

  private def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log 
= {
logCreationOrDeletionLock synchronized {
  this.logs.put(topicPartition, log)
}
  }
{code}

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps

 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1322) slow startup after unclean shutdown

2014-03-24 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1322:
-

Resolution: Won't Fix
Status: Resolved  (was: Patch Available)

ok

 slow startup after unclean shutdown
 ---

 Key: KAFKA-1322
 URL: https://issues.apache.org/jira/browse/KAFKA-1322
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Alexey Ozeritskiy
Priority: Critical
 Attachments: kafka.patch


 Kafka 0.8.1 checks all segments on unclean shutdown. 
 0.8.0 checks only the latest segment.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1322) slow startup after unclean shutdown

2014-03-23 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13944376#comment-13944376
 ] 

Alexey Ozeritskiy commented on KAFKA-1322:
--

Can't reproduce now. I had unclean shutdowns after upgrade 0.8-0.8.1 and after 
running out of disk space.

 slow startup after unclean shutdown
 ---

 Key: KAFKA-1322
 URL: https://issues.apache.org/jira/browse/KAFKA-1322
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Alexey Ozeritskiy
Priority: Critical
 Attachments: kafka.patch


 Kafka 0.8.1 checks all segments on unclean shutdown. 
 0.8.0 checks only the latest segment.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1322) slow start on unclean shutdown

2014-03-22 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-1322:


 Summary: slow start on unclean shutdown
 Key: KAFKA-1322
 URL: https://issues.apache.org/jira/browse/KAFKA-1322
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Alexey Ozeritskiy
Priority: Critical


Kafka 0.8.1 checks all segments on unclean shutdown. 
0.8.0 checks only the latest segment.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1322) slow start on unclean shutdown

2014-03-22 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1322:
-

Status: Patch Available  (was: Open)

 slow start on unclean shutdown
 --

 Key: KAFKA-1322
 URL: https://issues.apache.org/jira/browse/KAFKA-1322
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Alexey Ozeritskiy
Priority: Critical

 Kafka 0.8.1 checks all segments on unclean shutdown. 
 0.8.0 checks only the latest segment.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1322) slow start on unclean shutdown

2014-03-22 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1322:
-

Attachment: kafka.patch

 slow start on unclean shutdown
 --

 Key: KAFKA-1322
 URL: https://issues.apache.org/jira/browse/KAFKA-1322
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Alexey Ozeritskiy
Priority: Critical
 Attachments: kafka.patch


 Kafka 0.8.1 checks all segments on unclean shutdown. 
 0.8.0 checks only the latest segment.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1322) slow startup after unclean shutdown

2014-03-22 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1322:
-

Summary: slow startup after unclean shutdown  (was: slow start on unclean 
shutdown)

 slow startup after unclean shutdown
 ---

 Key: KAFKA-1322
 URL: https://issues.apache.org/jira/browse/KAFKA-1322
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Alexey Ozeritskiy
Priority: Critical
 Attachments: kafka.patch


 Kafka 0.8.1 checks all segments on unclean shutdown. 
 0.8.0 checks only the latest segment.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-25 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13692816#comment-13692816
 ] 

Alexey Ozeritskiy commented on KAFKA-937:
-

This patch works, thanks.

 ConsumerFetcherThread can deadlock
 --

 Key: KAFKA-937
 URL: https://issues.apache.org/jira/browse/KAFKA-937
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8

 Attachments: kafka-937_ConsumerOffsetChecker.patch, 
 kafka-937_delta.patch, kafka-937.patch


 We have the following access pattern that can introduce a deadlock.
 AbstractFetcherThread.processPartitionsWithError() -
 ConsumerFetcherThread.processPartitionsWithError() - 
 ConsumerFetcherManager.addPartitionsWithError() wait for lock -
 LeaderFinderThread holding lock while calling 
 AbstractFetcherManager.shutdownIdleFetcherThreads() -
 AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
 AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-24 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13692418#comment-13692418
 ] 

Alexey Ozeritskiy commented on KAFKA-937:
-

https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5

 ConsumerFetcherThread can deadlock
 --

 Key: KAFKA-937
 URL: https://issues.apache.org/jira/browse/KAFKA-937
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8

 Attachments: kafka-937_delta.patch, kafka-937.patch


 We have the following access pattern that can introduce a deadlock.
 AbstractFetcherThread.processPartitionsWithError() -
 ConsumerFetcherThread.processPartitionsWithError() - 
 ConsumerFetcherManager.addPartitionsWithError() wait for lock -
 LeaderFinderThread holding lock while calling 
 AbstractFetcherManager.shutdownIdleFetcherThreads() -
 AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
 AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Comment Edited] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-24 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13692418#comment-13692418
 ] 

Alexey Ozeritskiy edited comment on KAFKA-937 at 6/24/13 9:44 PM:
--

This patch touches SimpleConsumer.

proof:

https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5

  was (Author: aozeritsky):

https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5
  
 ConsumerFetcherThread can deadlock
 --

 Key: KAFKA-937
 URL: https://issues.apache.org/jira/browse/KAFKA-937
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8

 Attachments: kafka-937_delta.patch, kafka-937.patch


 We have the following access pattern that can introduce a deadlock.
 AbstractFetcherThread.processPartitionsWithError() -
 ConsumerFetcherThread.processPartitionsWithError() - 
 ConsumerFetcherManager.addPartitionsWithError() wait for lock -
 LeaderFinderThread holding lock while calling 
 AbstractFetcherManager.shutdownIdleFetcherThreads() -
 AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
 AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira