[jira] [Comment Edited] (KAFKA-4271) The console consumer fails on Windows with new consumer is used

2016-11-28 Thread Yu Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704074#comment-15704074
 ] 

Yu Yan edited comment on KAFKA-4271 at 11/29/16 7:53 AM:
-

I have met this problem too. My jvm is 32 bit, and 64jvm may work well.
But I have sovle it by reduce the init jvm in the kafka-server-start.bat. 
Change the 1G to 512M, then the consumer can work.
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
Hope it can help someone.
In my later test, the topic number is related the jvm, each topic will get the 
assigned JVM




was (Author: yuyan):
I have met this problem too. My jvm is 32 bit, and 64jvm may work well.
But I have sovle it by reduce the init jvm in the kafka-server-start.bat. 
Change the 1G to 512M, then the consumer can work.
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
Hope it can help someone.


> The console consumer fails on Windows with new consumer is used 
> 
>
> Key: KAFKA-4271
> URL: https://issues.apache.org/jira/browse/KAFKA-4271
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Vahid Hashemian
>
> When I try to consume message using the new consumer (Quickstart Step 5) I 
> get an exception on the broker side. The old consumer works fine.
> {code}
> java.io.IOException: Map failed
> at sun.nio.ch.FileChannelImpl.map(Unknown Source)
> at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
> at kafka.log.OffsetIndex.(OffsetIndex.scala:51)
> at kafka.log.LogSegment.(LogSegment.scala:67)
> at kafka.log.Log.loadSegments(Log.scala:255)
> at kafka.log.Log.(Log.scala:108)
> at kafka.log.LogManager.createLog(LogManager.scala:362)
> at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
> at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
> at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
> at kafka.cluster.Partition.makeLeader(Partition.scala:168)
> at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:740)
> at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:739)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:739)
> at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:685)
> at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.OutOfMemoryError: Map failed
> at sun.nio.ch.FileChannelImpl.map0(Native Method)
> ... 29 more
> {code}
> This issue seems to break the broker and I have to clear out the logs so I 
> can bring the broker back up again.
> Update: This issue seems to occur on 32-bit Windows only. I tried this on a 
> Windows 32-bit VM. On a 64-bit machine I did not get the error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4458) Add per partition metrics for in-sync and assigned replica count

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704295#comment-15704295
 ] 

ASF GitHub Bot commented on KAFKA-4458:
---

GitHub user xvrl opened a pull request:

https://github.com/apache/kafka/pull/2186

KAFKA-4458 add per partition in-sync and assigned replica count



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xvrl/kafka per-partition-replica-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2186


commit 2ca23a73103296f3d6ebecfa639e30915c6cec91
Author: Xavier Léauté 
Date:   2016-11-29T01:10:21Z

add per partition in-sync and assigned replica count




> Add per partition metrics for in-sync and assigned replica count
> 
>
> Key: KAFKA-4458
> URL: https://issues.apache.org/jira/browse/KAFKA-4458
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Xavier Léauté
>
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-96+-+Add+per+partition+metrics+for+in-sync+and+assigned+replica+count
>  for details on proposed changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2186: KAFKA-4458 add per partition in-sync and assigned ...

2016-11-28 Thread xvrl
GitHub user xvrl opened a pull request:

https://github.com/apache/kafka/pull/2186

KAFKA-4458 add per partition in-sync and assigned replica count



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xvrl/kafka per-partition-replica-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2186


commit 2ca23a73103296f3d6ebecfa639e30915c6cec91
Author: Xavier Léauté 
Date:   2016-11-29T01:10:21Z

add per partition in-sync and assigned replica count




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-11-28 Thread Abhi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704164#comment-15704164
 ] 

Abhi edited comment on KAFKA-1194 at 11/29/16 4:38 AM:
---

Let me give a try... Little equipped at my end right now.. But I will 
definitely test the build that you have shared on onedrive and let you know the 
results... 

Abhi


was (Author: abhit011):
Let give a try... Little equipped at my end.. But I will definitely test the 
build that you have shared on onedrive and let you know the results... 

Abhi

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Assignee: Jay Kreps
>  Labels: features, patch
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-11-28 Thread Abhi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704164#comment-15704164
 ] 

Abhi commented on KAFKA-1194:
-

Let give a try... Little equipped at my end.. But I will definitely test the 
build that you have shared on onedrive and let you know the results... 

Abhi

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Assignee: Jay Kreps
>  Labels: features, patch
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-11-28 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3959 started by Onur Karaman.
---
> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>Assignee: Onur Karaman
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4453) add request prioritization

2016-11-28 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-4453:

Status: Patch Available  (was: In Progress)

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] 
> definition of data loss in terms of messages beyond the high watermark)\[3\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[3\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-11-28 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-3959:

Status: Patch Available  (was: In Progress)

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.10.0.0, 0.9.0.1
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>Assignee: Onur Karaman
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-4453) add request prioritization

2016-11-28 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4453 started by Onur Karaman.
---
> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] 
> definition of data loss in terms of messages beyond the high watermark)\[3\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[3\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4271) The console consumer fails on Windows with new consumer is used

2016-11-28 Thread Yu Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704074#comment-15704074
 ] 

Yu Yan commented on KAFKA-4271:
---

I have met this problem too. My jvm is 32 bit, and 64jvm may work well.
But I have sovle it by reduce the init jvm in the kafka-server-start.bat. 
Change the 1G to 512M, then the consumer can work.
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
Hope it can help someone.


> The console consumer fails on Windows with new consumer is used 
> 
>
> Key: KAFKA-4271
> URL: https://issues.apache.org/jira/browse/KAFKA-4271
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Vahid Hashemian
>
> When I try to consume message using the new consumer (Quickstart Step 5) I 
> get an exception on the broker side. The old consumer works fine.
> {code}
> java.io.IOException: Map failed
> at sun.nio.ch.FileChannelImpl.map(Unknown Source)
> at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
> at kafka.log.OffsetIndex.(OffsetIndex.scala:51)
> at kafka.log.LogSegment.(LogSegment.scala:67)
> at kafka.log.Log.loadSegments(Log.scala:255)
> at kafka.log.Log.(Log.scala:108)
> at kafka.log.LogManager.createLog(LogManager.scala:362)
> at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
> at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
> at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
> at kafka.cluster.Partition.makeLeader(Partition.scala:168)
> at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:740)
> at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:739)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:739)
> at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:685)
> at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.OutOfMemoryError: Map failed
> at sun.nio.ch.FileChannelImpl.map0(Native Method)
> ... 29 more
> {code}
> This issue seems to break the broker and I have to clear out the logs so I 
> can bring the broker back up again.
> Update: This issue seems to occur on 32-bit Windows only. I tried this on a 
> Windows 32-bit VM. On a 64-bit machine I did not get the error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka-site pull request #33: Update contact.html

2016-11-28 Thread dossett
GitHub user dossett opened a pull request:

https://github.com/apache/kafka-site/pull/33

Update contact.html

Update link to search-hadoop.com

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dossett/kafka-site patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/33.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #33


commit f71448925602540e86c3a1eb44c73d1caa9cbf89
Author: Aaron Niskode-Dossett 
Date:   2016-11-29T01:47:08Z

Update contact.html




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4444) Aggregate requests sent from controller to broker during controlled shutdown

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703769#comment-15703769
 ] 

ASF GitHub Bot commented on KAFKA-:
---

Github user lindong28 closed the pull request at:

https://github.com/apache/kafka/pull/2141


> Aggregate requests sent from controller to broker during controlled shutdown
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2141: KAFKA-4444; Aggregate requests sent from controlle...

2016-11-28 Thread lindong28
Github user lindong28 closed the pull request at:

https://github.com/apache/kafka/pull/2141


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-11-28 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703758#comment-15703758
 ] 

Mayuresh Gharat commented on KAFKA-4454:


[~ijuma] [~jjkoshy] [~ashishsinghdev] [~parth.brahmbhatt] would you mind taking 
a look at this? 
I will be happy to submit a PR for this.

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2185: MINOR: added logging to debug test

2016-11-28 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2185

MINOR: added logging to debug test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka hotfixResetToolTest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2185


commit 8f4f6ca748d4b915cc69f2e48f829c3bdd326adc
Author: Matthias J. Sax 
Date:   2016-11-29T00:43:27Z

MINOR: added logging to debug test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3715) Higher granularity streams metrics

2016-11-28 Thread aarti gupta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aarti gupta updated KAFKA-3715:
---
Assignee: Eno Thereska  (was: aarti gupta)

> Higher granularity streams metrics 
> ---
>
> Key: KAFKA-3715
> URL: https://issues.apache.org/jira/browse/KAFKA-3715
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Eno Thereska
>Priority: Minor
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Originally proposed by [~guozhang] in 
> https://github.com/apache/kafka/pull/1362#issuecomment-218326690
> We can consider adding metrics for process / punctuate / commit rate at the 
> granularity of each processor node in addition to the global rate mentioned 
> above. This is very helpful in debugging.
> We can consider adding rate / total cumulated metrics for context.forward 
> indicating how many records were forwarded downstream from this processor 
> node as well. This is helpful in debugging.
> We can consider adding metrics for each stream partition's timestamp. This is 
> helpful in debugging.
> Besides the latency metrics, we can also add throughput latency in terms of 
> source records consumed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3715) Higher granularity streams metrics

2016-11-28 Thread aarti gupta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703705#comment-15703705
 ] 

aarti gupta commented on KAFKA-3715:


[~enothereska] go for it!

> Higher granularity streams metrics 
> ---
>
> Key: KAFKA-3715
> URL: https://issues.apache.org/jira/browse/KAFKA-3715
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: aarti gupta
>Priority: Minor
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Originally proposed by [~guozhang] in 
> https://github.com/apache/kafka/pull/1362#issuecomment-218326690
> We can consider adding metrics for process / punctuate / commit rate at the 
> granularity of each processor node in addition to the global rate mentioned 
> above. This is very helpful in debugging.
> We can consider adding rate / total cumulated metrics for context.forward 
> indicating how many records were forwarded downstream from this processor 
> node as well. This is helpful in debugging.
> We can consider adding metrics for each stream partition's timestamp. This is 
> helpful in debugging.
> Besides the latency metrics, we can also add throughput latency in terms of 
> source records consumed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-96 - Add per partition metrics for in-sync and assigned replica count

2016-11-28 Thread Xavier Léauté
Hi,

I created KIP-96 to propose per partition in-sync / assigned replica
metrics. Should be straightforward, but submitting it for proposal since we
require it for metrics changes.

Here's the link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-96+-+Add+per+partition+metrics+for+in-sync+and+assigned+replica+count

Thank you,
Xavier


[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2016-11-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703697#comment-15703697
 ] 

Guozhang Wang commented on KAFKA-4039:
--

Ping [~hachikuji] [~ijuma] again.

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Priority: Critical
> Fix For: 0.10.1.1
>
> Attachments: deadlock-stack2
>
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2016-11-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703696#comment-15703696
 ] 

Guozhang Wang commented on KAFKA-4039:
--

Ping [~hachikuji] [~ijuma] again.

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Priority: Critical
> Fix For: 0.10.1.1
>
> Attachments: deadlock-stack2
>
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4399:
-
Assignee: Alexey Ozeritskiy

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Assignee: Alexey Ozeritskiy
>Priority: Blocker
> Fix For: 0.10.1.1
>
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Json Tu updated KAFKA-4447:
---
Comment: was deleted

(was: [~wangg...@gmail.com] thank you for such a detailed analysis .
as you mentioned here,  Before that happens I think a simple check as Jason 
mentioned before may not be sufficient, since it could happen that when the 
listener thread does the check it is still not resigned, but while it is 
executing the resignation happens.
at this place,my opinion is a little different from yours,though when the 
resignation is happens but not complete,the other listener may be fired,but it 
doesn‘t matter,because zk's callback process is single-threaded,so the simple 
check after that will be take effect.
as you say,even if these phenomenon happens,I very much agree with it 
doesn‘t do any harm to the cluster because of the obsoleted epoch number. the 
effect of this check can be used to decrease the interfere with the old 
controller‘s log,from the point of this, may be it will have certain meaning.
just be told,controller will be re-writed,could you reveal the release time 
of this change. thanks.

)

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703688#comment-15703688
 ] 

Json Tu commented on KAFKA-4447:


[~wangg...@gmail.com] thank you for such a detailed analysis .
as you mentioned here,  Before that happens I think a simple check as Jason 
mentioned before may not be sufficient, since it could happen that when the 
listener thread does the check it is still not resigned, but while it is 
executing the resignation happens.
at this place,my opinion is a little different from yours,though when the 
resignation is happens but not complete,the other listener may be fired,but it 
doesn‘t matter,because zk's callback process is single-threaded,so the simple 
check after that will be take effect.
as you say,even if these phenomenon happens,I very much agree with it 
doesn‘t do any harm to the cluster because of the obsoleted epoch number. the 
effect of this check can be used to decrease the interfere with the old 
controller‘s log,from the point of this, may be it will have certain meaning.
just be told,controller will be re-writed,could you reveal the release time 
of this change. thanks.



> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703687#comment-15703687
 ] 

Json Tu commented on KAFKA-4447:


[~wangg...@gmail.com] thank you for such a detailed analysis .
as you mentioned here,  Before that happens I think a simple check as Jason 
mentioned before may not be sufficient, since it could happen that when the 
listener thread does the check it is still not resigned, but while it is 
executing the resignation happens.
at this place,my opinion is a little different from yours,though when the 
resignation is happens but not complete,the other listener may be fired,but it 
doesn‘t matter,because zk's callback process is single-threaded,so the simple 
check after that will be take effect.
as you say,even if these phenomenon happens,I very much agree with it 
doesn‘t do any harm to the cluster because of the obsoleted epoch number. the 
effect of this check can be used to decrease the interfere with the old 
controller‘s log,from the point of this, may be it will have certain meaning.
just be told,controller will be re-writed,could you reveal the release time 
of this change. thanks.



> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4436) Provide builder pattern for StreamsConfig

2016-11-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703682#comment-15703682
 ] 

Xavier Léauté commented on KAFKA-4436:
--

[~gwenshap] One of the difference I see between configuring Kafka clients vs. 
configuring Kafka streams is the following: the client configuration mainly 
defines how my application interacts with Kafka brokers, whereas the streams 
configuration also defines the inherent behavior of the application I am 
writing. As such, the kafka streams configuration is less likely to reside in a 
configuration file outside of my application – as might be the case with the 
kafka clients – and more likely to be specified as part of the source code 
itself.

That being said, I would also welcome builder patterns for Kafka client 
configuration, since that might provide a more user-friendly way of interacting 
programmatically with the clients. It may be worthwhile to keep the existing 
client configuration mechanism to allow easy configuration via property files, 
and then allow the builder pattern to work on top of it to override things 
programmatically.

> Provide builder pattern for StreamsConfig
> -
>
> Key: KAFKA-4436
> URL: https://issues.apache.org/jira/browse/KAFKA-4436
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, {{StreamsConfig}} parameters must be set "manually" as key value 
> pairs. This has multiple disadvantages from a user point of view:
>  - mandatory arguments could be missing
>  - data types might be wrong
>  - producer/consumer config parameters could conflict as they might have the 
> same name (user needs to know to prefix them to avoid conflict)
> Those problems have different impact: either a runtime exception is thrown if 
> the problem is detected (e.g. missing parameter or wrong type) or the 
> application is just not configured correctly (producer/consumer has wrong 
> config).
> A builder pattern would avoid those problems by forcing the user in the first 
> place to specify thing correctly (otherwise, it won't compile). For example 
> something like this:
> {noformat}
> StreamsConfig config = StreamsConfig.builder()
> .setApplicationId(String appId)
> .addBootstrapServer(String host, int port)
> .addBootstrapServer(String host, int port)
> .addZookeeper(String host, int port)
> .addZookeeper(String host, int port)
> .setStateDirectory(File path)
> .setConsumerConfig(
> ConsumerConfig.builder()
> .setAutoOffsetReset(...)
> .build()
> )
> .build();
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4442) Controller should grab lock when it is being initialized to avoid race condition

2016-11-28 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4442:

Description: 
Currently controller will register broker change listener before sending send 
LeaderAndIsrRequests to live replicas. The call path looks like this:

- onControllerFailover()
  - partitionStateMachine.startup()
- triggerOnlinePartitionStateChange()
  - handleStateChange(partition, OnlinePartition)
- electLeaderForPartition(partition)
  - determines live replicas for this partition (step a)
  - add partition to controllerContext.partitionLeadershipInfo. (step b)
  - send LeaderAndIsrRequest to those live replics for this partition

However, if a broker registers itself in zookeeper in between step (a) and step 
(b), the onBrokerStartup() will not send LeaderAndIsrRequest to this broker for 
this partition because the partition is not found in 
controllerContext.partitionLeadershipInfo. Yet onControllerFailover() will not 
send LeaderAndIsrRequest to this broker for this partition either because the 
broker is not considered live in step (a).

The root cause is that onBrokerStartup() should only be executed after 
controller has finished onControllerFailover() and initialized its state. 
Therefore controller should grab the lock controllerContext.controllerLock 
during onControllerFailover().




  was:
Currently controller will register broker change listener before sending send 
LeaderAndIsrRequests to live replicas. The call path looks like this:

- onControllerFailover()
  - partitionStateMachine.startup()
- triggerOnlinePartitionStateChange()
  - handleStateChange(partition, OnlinePartition)
- electLeaderForPartition(partition)
  - determines live replicas for this partition (step a)
  - add partition to controllerContext.partitionLeadershipInfo. (step b)
  - send LeaderAndIsrRequest to those live replics for this partition

However, if a broker registers itself in zookeeper in between step (a) and step 
(b), the onBrokerStartup() will not send LeaderAndIsrRequest to this broker for 
this partition because the partition is not found in 
controllerContext.partitionLeadershipInfo. Yet onControllerFailover() will not 
send LeaderAndIsrRequest to this broker for this partition either before the 
broker is not considered live in step (a).

The root cause is that onBrokerStartup() should only be executed after 
controller has finished onControllerFailover() and initialized its state. 
Therefore controller should grab the lock controllerContext.controllerLock 
during onControllerFailover().





> Controller should grab lock when it is being initialized to avoid race 
> condition
> 
>
> Key: KAFKA-4442
> URL: https://issues.apache.org/jira/browse/KAFKA-4442
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> Currently controller will register broker change listener before sending send 
> LeaderAndIsrRequests to live replicas. The call path looks like this:
> - onControllerFailover()
>   - partitionStateMachine.startup()
> - triggerOnlinePartitionStateChange()
>   - handleStateChange(partition, OnlinePartition)
> - electLeaderForPartition(partition)
>   - determines live replicas for this partition (step a)
>   - add partition to controllerContext.partitionLeadershipInfo. (step 
> b)
>   - send LeaderAndIsrRequest to those live replics for this partition
> However, if a broker registers itself in zookeeper in between step (a) and 
> step (b), the onBrokerStartup() will not send LeaderAndIsrRequest to this 
> broker for this partition because the partition is not found in 
> controllerContext.partitionLeadershipInfo. Yet onControllerFailover() will 
> not send LeaderAndIsrRequest to this broker for this partition either because 
> the broker is not considered live in step (a).
> The root cause is that onBrokerStartup() should only be executed after 
> controller has finished onControllerFailover() and initialized its state. 
> Therefore controller should grab the lock controllerContext.controllerLock 
> during onControllerFailover().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4458) Add per partition metrics for in-sync and assigned replica count

2016-11-28 Thread JIRA
Xavier Léauté created KAFKA-4458:


 Summary: Add per partition metrics for in-sync and assigned 
replica count
 Key: KAFKA-4458
 URL: https://issues.apache.org/jira/browse/KAFKA-4458
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Reporter: Xavier Léauté


See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-96+-+Add+per+partition+metrics+for+in-sync+and+assigned+replica+count
 for details on proposed changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4436) Provide builder pattern for StreamsConfig

2016-11-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703620#comment-15703620
 ] 

Matthias J. Sax commented on KAFKA-4436:


bq. One of the goals in making KafkaStreams part of Apache Kafka was to make 
sure there are consistent APIs and tools that make things easier to users who 
are used to "normal clients" to adopt more advanced features.

Completely agreed.

That is why I like the idea to change it for other configs like 
Producer/Consumer, too. If more people think, that is the right way to go, we 
can just extend the scope of this JIRA. One main difference with regard to 
Streams might be the people who are using it. Streams targets an even broader 
application developer audience and thus ease to use might even be more 
important. Furthermore, in Streams, there are a few configuration that are 
fixed and cannot be changed by the user (for example setting auto commit). 
Using builder pattern would help to avoid that users change it (there might be 
more such settings in the future) -- a feature like this is of course not 
required for consumer/producer clients where all setting are valid.

> Provide builder pattern for StreamsConfig
> -
>
> Key: KAFKA-4436
> URL: https://issues.apache.org/jira/browse/KAFKA-4436
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, {{StreamsConfig}} parameters must be set "manually" as key value 
> pairs. This has multiple disadvantages from a user point of view:
>  - mandatory arguments could be missing
>  - data types might be wrong
>  - producer/consumer config parameters could conflict as they might have the 
> same name (user needs to know to prefix them to avoid conflict)
> Those problems have different impact: either a runtime exception is thrown if 
> the problem is detected (e.g. missing parameter or wrong type) or the 
> application is just not configured correctly (producer/consumer has wrong 
> config).
> A builder pattern would avoid those problems by forcing the user in the first 
> place to specify thing correctly (otherwise, it won't compile). For example 
> something like this:
> {noformat}
> StreamsConfig config = StreamsConfig.builder()
> .setApplicationId(String appId)
> .addBootstrapServer(String host, int port)
> .addBootstrapServer(String host, int port)
> .addZookeeper(String host, int port)
> .addZookeeper(String host, int port)
> .setStateDirectory(File path)
> .setConsumerConfig(
> ConsumerConfig.builder()
> .setAutoOffsetReset(...)
> .build()
> )
> .build();
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4439) Add a builder to NetworkClient

2016-11-28 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe reassigned KAFKA-4439:
--

Assignee: Colin P. McCabe

> Add a builder to NetworkClient
> --
>
> Key: KAFKA-4439
> URL: https://issues.apache.org/jira/browse/KAFKA-4439
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Minor
>
> NetworkClient's constructors have too many parameters.  This makes it hard to 
> follow what each value is being initialized to.  Instead, let's use the 
> builder pattern to clearly identify what each parameter is and provide 
> sensible defaults for each.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4457) Add a command to list the broker version information

2016-11-28 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe reassigned KAFKA-4457:
--

Assignee: Colin P. McCabe

> Add a command to list the broker version information
> 
>
> Key: KAFKA-4457
> URL: https://issues.apache.org/jira/browse/KAFKA-4457
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Add a command to list the broker version information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka Clients Survey

2016-11-28 Thread Gwen Shapira
Hey Kafka Community,

I'm trying to take a pulse on the current state of the Kafka clients ecosystem.
Which languages are most popular in our community? What does the
community value in clients?

You can help me out by filling in the survey:
https://goo.gl/forms/cZg1CJyf1PuqivTg2

I will lock the survey and publish results by Jan 15.

Gwen


[jira] [Commented] (KAFKA-4457) Add a command to list the broker version information

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703450#comment-15703450
 ] 

ASF GitHub Bot commented on KAFKA-4457:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2184

KAFKA-4457. Add BrokerVersionCommand



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4457

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2184


commit f36cfd19cf9705f71b98c27f9054ff94e70e75e3
Author: Colin P. Mccabe 
Date:   2016-11-28T23:02:25Z

KAFKA-4457. Add BrokerVersionCommand




> Add a command to list the broker version information
> 
>
> Key: KAFKA-4457
> URL: https://issues.apache.org/jira/browse/KAFKA-4457
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Colin P. McCabe
>
> Add a command to list the broker version information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2184: KAFKA-4457. Add BrokerVersionCommand

2016-11-28 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2184

KAFKA-4457. Add BrokerVersionCommand



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4457

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2184


commit f36cfd19cf9705f71b98c27f9054ff94e70e75e3
Author: Colin P. Mccabe 
Date:   2016-11-28T23:02:25Z

KAFKA-4457. Add BrokerVersionCommand




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703435#comment-15703435
 ] 

Guozhang Wang commented on KAFKA-4399:
--

Ping [~hachikuji] [~ijuma] again.

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Fix For: 0.10.1.1
>
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4457) Add a command to list the broker version information

2016-11-28 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-4457:
--

 Summary: Add a command to list the broker version information
 Key: KAFKA-4457
 URL: https://issues.apache.org/jira/browse/KAFKA-4457
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.10.1.1
Reporter: Colin P. McCabe


Add a command to list the broker version information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2183: Removed obsolete parameter form example config in ...

2016-11-28 Thread soenkeliebau
GitHub user soenkeliebau opened a pull request:

https://github.com/apache/kafka/pull/2183

Removed obsolete parameter form example config in docs. 

Parameter controller.message.queue.size was removed in 0.9 (KAFKA-2122) but 
is still listed in an example broker configuration in the documentation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soenkeliebau/kafka 
controller.message.queue.size

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2183


commit 72b066d7b24413b8617378edae16faa98907c06d
Author: Sönke Liebau 
Date:   2016-11-28T22:52:47Z

Removed obsolete parameter form example config in docs. Parameter 
controller.message.queue.size was removed in 0.9 (KAFKA-2122)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2122) Remove controller.message.queue.size Config

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703408#comment-15703408
 ] 

ASF GitHub Bot commented on KAFKA-2122:
---

GitHub user soenkeliebau opened a pull request:

https://github.com/apache/kafka/pull/2183

Removed obsolete parameter form example config in docs. 

Parameter controller.message.queue.size was removed in 0.9 (KAFKA-2122) but 
is still listed in an example broker configuration in the documentation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soenkeliebau/kafka 
controller.message.queue.size

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2183


commit 72b066d7b24413b8617378edae16faa98907c06d
Author: Sönke Liebau 
Date:   2016-11-28T22:52:47Z

Removed obsolete parameter form example config in docs. Parameter 
controller.message.queue.size was removed in 0.9 (KAFKA-2122)




> Remove controller.message.queue.size Config
> ---
>
> Key: KAFKA-2122
> URL: https://issues.apache.org/jira/browse/KAFKA-2122
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Sriharsha Chintalapani
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2122.patch, KAFKA-2122_2015-04-19_12:44:41.patch
>
>
> A deadlock can happen during a delete topic if controller.message.queue.size 
> is overridden to a custom value. Details are here: 
> https://issues.apache.org/jira/browse/KAFKA-2046?focusedCommentId=14380776=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14380776
> Given that KAFKA-1993 is enabling delete topic by default, it would be unsafe 
> to simultaneously allow a configurable controller.message.queue.size



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4450) Add missing 0.10.1.x upgrade tests and ensure ongoing compatibility checks

2016-11-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703349#comment-15703349
 ] 

Guozhang Wang commented on KAFKA-4450:
--

[~ewencp] [~ijuma] What's the status of this JIRA ticket?

> Add missing 0.10.1.x upgrade tests and ensure ongoing compatibility checks
> --
>
> Key: KAFKA-4450
> URL: https://issues.apache.org/jira/browse/KAFKA-4450
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 0.10.1.0
>Reporter: Ewen Cheslack-Postava
>Priority: Critical
> Fix For: 0.10.1.1, 0.10.2.0
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We have upgrade system tests, but we neglected to update them for the most 
> recent released versions (we only have LATEST_0_10_0 but not something from 
> 0_10_1).
> We should probably not only add these versions, but also a) make sure some 
> TRUNK version is always included since upgrade to trunk would always be 
> possible to avoid issues for anyone deploying off trunk (we want every commit 
> to trunk to be solid & compatible) and b) make sure there aren't gaps between 
> versions annotated on the test vs versions that are officially released 
> (which may not be easy statically with the decorators, but might be possible 
> by checking the kafkatest version against previous versions and checking for 
> gaps?).
> Perhaps we need to be able to get the most recent release/snapshot version 
> from the python code so we can always validate previous versions? Even if 
> that's possible, is there going to be a reliable way to get all the previous 
> released versions so we can make sure we have all upgrade tests in place?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4450) Add missing 0.10.1.x upgrade tests and ensure ongoing compatibility checks

2016-11-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4450:
-
Fix Version/s: (was: 0.11.0.0)

> Add missing 0.10.1.x upgrade tests and ensure ongoing compatibility checks
> --
>
> Key: KAFKA-4450
> URL: https://issues.apache.org/jira/browse/KAFKA-4450
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 0.10.1.0
>Reporter: Ewen Cheslack-Postava
>Priority: Critical
> Fix For: 0.10.1.1, 0.10.2.0
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We have upgrade system tests, but we neglected to update them for the most 
> recent released versions (we only have LATEST_0_10_0 but not something from 
> 0_10_1).
> We should probably not only add these versions, but also a) make sure some 
> TRUNK version is always included since upgrade to trunk would always be 
> possible to avoid issues for anyone deploying off trunk (we want every commit 
> to trunk to be solid & compatible) and b) make sure there aren't gaps between 
> versions annotated on the test vs versions that are officially released 
> (which may not be easy statically with the decorators, but might be possible 
> by checking the kafkatest version against previous versions and checking for 
> gaps?).
> Perhaps we need to be able to get the most recent release/snapshot version 
> from the python code so we can always validate previous versions? Even if 
> that's possible, is there going to be a reliable way to get all the previous 
> released versions so we can make sure we have all upgrade tests in place?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4436) Provide builder pattern for StreamsConfig

2016-11-28 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703313#comment-15703313
 ] 

Gwen Shapira commented on KAFKA-4436:
-

One of the goals in making KafkaStreams part of Apache Kafka was to make sure 
there are consistent APIs and tools that make things easier to users who are 
used to "normal clients" to adopt more advanced features.

If we are adding this to StreamsConfig, maybe it is worthwhile doing for 
ProducerConfig and ConsumerConfig? Maybe even KafkaConfig? Even if it isn't 
useful outside Streams, perhaps just document why it is important for streams 
but nowhere else.

> Provide builder pattern for StreamsConfig
> -
>
> Key: KAFKA-4436
> URL: https://issues.apache.org/jira/browse/KAFKA-4436
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, {{StreamsConfig}} parameters must be set "manually" as key value 
> pairs. This has multiple disadvantages from a user point of view:
>  - mandatory arguments could be missing
>  - data types might be wrong
>  - producer/consumer config parameters could conflict as they might have the 
> same name (user needs to know to prefix them to avoid conflict)
> Those problems have different impact: either a runtime exception is thrown if 
> the problem is detected (e.g. missing parameter or wrong type) or the 
> application is just not configured correctly (producer/consumer has wrong 
> config).
> A builder pattern would avoid those problems by forcing the user in the first 
> place to specify thing correctly (otherwise, it won't compile). For example 
> something like this:
> {noformat}
> StreamsConfig config = StreamsConfig.builder()
> .setApplicationId(String appId)
> .addBootstrapServer(String host, int port)
> .addBootstrapServer(String host, int port)
> .addZookeeper(String host, int port)
> .addZookeeper(String host, int port)
> .setStateDirectory(File path)
> .setConsumerConfig(
> ConsumerConfig.builder()
> .setAutoOffsetReset(...)
> .build()
> )
> .build();
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running on the same machines but they had separate 
directories for state stores. In the attached logs you can see 
{{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread names 
to have bigger numbers.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running on the same machines but they had separate 
directories for state stores. In the attached logs you can see 
{{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on 

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running on the same machines but they had separate 
directories for state stores. In the attached logs you can see 
{{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released 

[jira] [Work started] (KAFKA-4437) Incremental Batch Processing for Kafka Streams

2016-11-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4437 started by Matthias J. Sax.
--
> Incremental Batch Processing for Kafka Streams
> --
>
> Key: KAFKA-4437
> URL: https://issues.apache.org/jira/browse/KAFKA-4437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> We want to add an “auto stop” feature that terminate a stream application 
> when it has processed all the data that was newly available at the time the 
> application started (to at current end-of-log, i.e., current high watermark). 
> This allows to chop the (infinite) log into finite chunks where each run for 
> the application processes one chunk. This feature allows for incremental 
> batch-like processing; think "start-process-stop-restart-process-stop-..."
> For details see KIP-95: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Data (re)processing with Kafka (new wiki page)

2016-11-28 Thread Matthias J. Sax
Thanks for your input!

For (2), there is currently KafkaStream.toString(). You could
theoretically parse the String to extract all required information -- of
course, this is not a nice solution.

-Matthias

On 11/25/16 2:11 AM, saiprasad mishra wrote:
> This page is really helpful.Thanks for putting this
> 
> Some nice to have features can be (not sure for this wiki page)
> 
> 1) Pause and resume without having to start and stop.
> It should drain all the inflight calculations before doing the actual pause
> and a notifier will be helpful that it is actually paused.
> This can be much light wait if possible rather than stopping all tasks and
> stores and starting them again which might take a lot of time
> 
> 
> 2) Metadata api response if it can have all the topology graphs and sub
> graphs with all the nodes and edges for each sub graphs with corresponding
> state store names then it will be easier to build some UI on top of it and
> also the pipe between the sub graphs which are kafka topics need to be
> called out and also the time semantics can be laid out on top of it. This
> is something like a logical view on top of the physical view which the
> current api has.
> 
> 
> Regards
> Sai
> 
> On Fri, Nov 25, 2016 at 12:53 AM, Michael Noll  wrote:
> 
>> Thanks a lot, Matthias!
>>
>> I have already begun to provide feedback.
>>
>> -Michael
>>
>>
>>
>> On Wed, Nov 23, 2016 at 11:41 PM, Matthias J. Sax 
>> wrote:
>>
>>> Hi,
>>>
>>> we added a new wiki page that is supposed to collect data (re)processing
>>> scenario with Kafka:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Data+(Re)
>>> Processing+Scenarios
>>>
>>> We added already a couple of scenarios we think might be common and want
>>> to invite all of you to add more. This helps to get a better overview of
>>> requirements to enable new use cases.
>>>
>>> We are looking forward to your feedback!
>>>
>>>
>>> -Matthias
>>>
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] 0.10.1.1 Plan

2016-11-28 Thread Bernard Leach
Getting out 0.10.1.1 quickly makes a lot of sense so perhaps the answer to the 
quality concern is to not publish the 2.12 tgz and instead simply publish the 
artifacts to maven central allowing the scala community to become a part of 
that quality process.  That way when the 0.10.2.0 release swings around there 
will already be a level of maturity with 2.12.

On 29 Nov 2016, at 06:17, Guozhang Wang  wrote:
> 
> @Radai
> 
> I think Ismael's point is that people who coded their applications against
> 0.10.1.1 should be able to swipe in the 0.10.1.0 jar without breaking the
> code (in practice maybe it is more likely the reverse scenario, that people
> coded against 0.10.1.0 and trying to swipe in a newer 0.10.1.1).
> 
> @Bernard Leach
> 
> I agree that having Scala 2.12 support as early as possible than next Feb
> can help the Scala community who's using Kafka. However, the downside is
> that it may increase our release cycles for this bug-fix release as we need
> to add more validation process to make sure that builds with Scala12 works
> perfectly, while we are trying to make 0.10.1.1 out asap.
> 
> 
> Guozhang
> 
> 
> On Sun, Nov 27, 2016 at 5:16 PM, Bernard Leach 
> wrote:
> 
>> I guess bugs are in the eye of the beholder; I’d really like to see
>> KAFKA-4438 addressed as a patch release - the bug being "I can’t currently
>> build any scala 2.12 projects with Kafka as a dependency".  There’s a PR
>> ready to go for the branch that has already been accepted to trunk and this
>> Ismael has addressed the risk issue by making the 2.12 not part of the
>> default build.
>> 
>> There are a number of downstream projects awaiting kafka scala 2.12
>> binaries so they can publish their own so getting this addressed before
>> February would really help push along the availability of 2.12 compatible
>> libraries.
>> 
>> cheers,
>> bern.
>> 
>>> On 27 Nov 2016, at 10:08, radai  wrote:
>>> 
>>> "compatibility guarantees that are expected by people who subclass these
>>> classes"
>>> 
>>> sorry if this is not the best thread for this discussion, but I just
>> wanted
>>> to pop in and say that since any subclassing of these will obviously not
>> be
>>> done within the kafka codebase - what guarantees are needed?
>>> 
>>> On Fri, Nov 25, 2016 at 11:13 PM, Michael Pearce 
>>> wrote:
>>> 
 As agreed that this should be purely a bug fix release for stability.
 
 
 I'd like to flag then we shouldn't be adding / merging in any Jira's
>> that
 are not bugs.
 
 e.g. KAFKA-4438
 
 
 
 From: isma...@gmail.com  on behalf of Ismael Juma <
 ism...@juma.me.uk>
 Sent: Friday, November 25, 2016 11:43 AM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] 0.10.1.1 Plan
 
 Good, seems like we are in agreement about sticking to bug fixes for
 0.10.1.1.
 
 Regarding the removal of final, I understand that it doesn't break
 backwards binary compatibility (it does break forwards compatibility and
 hence why it's more suited for feature releases; these are the same
>> rules
 followed by the JDK and Scala).
 
 It's probably best to discuss this in another thread, but to clarify:
>> the
 point I was making is that once you make a class non final, maintaining
 compatibility becomes more complex. You have to take into account that
 methods could have been overridden, for example. As such, some thought
 should be given to the scenarios under which classes can be extended,
>> how
 we will ensure that we can evolve the class without breaking users,
>> whether
 we want to make some methods final, whether we should add some
 documentation guiding users regarding good/bad examples of inheritance
 versus composition for these classes and so on. This is particularly
 important for core client classes exposed by the consumer and producer
>> and
 it is the approach taken by projects that have a good track record with
 regards to maintaining compatibility for long periods of time. I didn't
>> see
 much discussion or reasoning along the lines above for the
 ProducerRecord/ConsumerRecord change and hence my comment (even if it
>> may
 actually me a good change once all is considered).
 
 Ismael
 
 On Thu, Nov 24, 2016 at 7:56 PM, Michael Pearce 
 wrote:
 
> +1 it would be nice, and as is less restrive would not cause any issue.
> 
> Saying that agree this is a fix build not a feature build.
> 
> Sent using OWA for iPhone
> 
> From: Rajini Sivaram 
> Sent: Thursday, November 24, 2016 12:17:13 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] 0.10.1.1 Plan
> 
> Hi Ismael,
> 
> OK, 

[jira] [Comment Edited] (KAFKA-3798) Kafka Consumer 0.10.0.0 killed after rebalancing exception

2016-11-28 Thread Evan Nelson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703193#comment-15703193
 ] 

Evan Nelson edited comment on KAFKA-3798 at 11/28/16 9:34 PM:
--

We are experiencing the same issue with 0.8.2.2:

org.I0Itec.zkclient.exception.ZkNoNodeException: 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/\*\*\*/ids/\*\*\*
at 
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) 
~[zkclient-0.3.jar:0.3]
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) 
~[zkclient-0.3.jar:0.3]
etc...

(identifiers replaced with ***)

This happens on two different topics, one with 20 partitions and one with 40. 
We have 22 consumers for each. The event always seems to be precipitated by a 
zookeeper connection timeout, which may have been triggered by a long GC pause 
(~5.5 seconds). Once the rebalance loop starts it _never_ recovers, no matter 
how many retries we allot.


was (Author: ean5533):
We are experiencing the same issue with 0.8.2.2:

org.I0Itec.zkclient.exception.ZkNoNodeException: 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/**\*/ids/\***
at 
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) 
~[zkclient-0.3.jar:0.3]
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) 
~[zkclient-0.3.jar:0.3]
etc...

(identifiers replaced with ***)

This happens on two different topics, one with 20 partitions and one with 40. 
We have 22 consumers for each. The event always seems to be precipitated by a 
zookeeper connection timeout, which may have been triggered by a long GC pause 
(~5.5 seconds). Once the rebalance loop starts it _never_ recovers, no matter 
how many retries we allot.

> Kafka Consumer 0.10.0.0 killed after rebalancing exception
> --
>
> Key: KAFKA-3798
> URL: https://issues.apache.org/jira/browse/KAFKA-3798
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.0.0
> Environment: Production
>Reporter: Sahitya Agrawal
>Assignee: Neha Narkhede
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Hi , 
> I have a topic with 100 partitions and 25 consumers. Consumers were working 
> fine up to some time. 
> After some time I see kafka rebalancing exception in the logs. CPU usage is 
> also 100 % at that time. Consumer process got killed after that. 
> Kafka version : 0.10.0.0
> Some Error print from the logs are following:
> kafka.common.ConsumerRebalanceFailedException: prod_ip- can't rebalance 
> after 10 retries
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$2.run(ZookeeperConsumerConnector.scala:589)
> exception during rebalance
> org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /consumers/prod/ids/prod_ip-***
> at 
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at 
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
> at kafka.utils.ZkUtils.readData(ZkUtils.scala:542)
> at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:674)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:646)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:637)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:637)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:637)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:636)
> at 
> 

[jira] [Comment Edited] (KAFKA-3798) Kafka Consumer 0.10.0.0 killed after rebalancing exception

2016-11-28 Thread Evan Nelson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703193#comment-15703193
 ] 

Evan Nelson edited comment on KAFKA-3798 at 11/28/16 9:34 PM:
--

We are experiencing the same issue with 0.8.2.2:

org.I0Itec.zkclient.exception.ZkNoNodeException: 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/**\*/ids/\***
at 
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) 
~[zkclient-0.3.jar:0.3]
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) 
~[zkclient-0.3.jar:0.3]
etc...

(identifiers replaced with ***)

This happens on two different topics, one with 20 partitions and one with 40. 
We have 22 consumers for each. The event always seems to be precipitated by a 
zookeeper connection timeout, which may have been triggered by a long GC pause 
(~5.5 seconds). Once the rebalance loop starts it _never_ recovers, no matter 
how many retries we allot.


was (Author: ean5533):
We are experiencing the same issue with 0.8.2.2:

org.I0Itec.zkclient.exception.ZkNoNodeException: 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/***/ids/***
at 
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) 
~[zkclient-0.3.jar:0.3]
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) 
~[zkclient-0.3.jar:0.3]
etc...

(identifiers replaced with ***)

This happens on two different topics, one with 20 partitions and one with 40. 
We have 22 consumers for each. The event always seems to be precipitated by a 
zookeeper connection timeout, which may have been triggered by a long GC pause 
(~5.5 seconds). Once the rebalance loop starts it _never_ recovers, no matter 
how many retries we allot.

> Kafka Consumer 0.10.0.0 killed after rebalancing exception
> --
>
> Key: KAFKA-3798
> URL: https://issues.apache.org/jira/browse/KAFKA-3798
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.0.0
> Environment: Production
>Reporter: Sahitya Agrawal
>Assignee: Neha Narkhede
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Hi , 
> I have a topic with 100 partitions and 25 consumers. Consumers were working 
> fine up to some time. 
> After some time I see kafka rebalancing exception in the logs. CPU usage is 
> also 100 % at that time. Consumer process got killed after that. 
> Kafka version : 0.10.0.0
> Some Error print from the logs are following:
> kafka.common.ConsumerRebalanceFailedException: prod_ip- can't rebalance 
> after 10 retries
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$2.run(ZookeeperConsumerConnector.scala:589)
> exception during rebalance
> org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /consumers/prod/ids/prod_ip-***
> at 
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at 
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
> at kafka.utils.ZkUtils.readData(ZkUtils.scala:542)
> at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:674)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:646)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:637)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:637)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:637)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:636)
> at 
> 

[jira] [Commented] (KAFKA-3798) Kafka Consumer 0.10.0.0 killed after rebalancing exception

2016-11-28 Thread Evan Nelson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703193#comment-15703193
 ] 

Evan Nelson commented on KAFKA-3798:


We are experiencing the same issue with 0.8.2.2:

org.I0Itec.zkclient.exception.ZkNoNodeException: 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/***/ids/***
at 
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) 
~[zkclient-0.3.jar:0.3]
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) 
~[zkclient-0.3.jar:0.3]
etc...

(identifiers replaced with ***)

This happens on two different topics, one with 20 partitions and one with 40. 
We have 22 consumers for each. The event always seems to be precipitated by a 
zookeeper connection timeout, which may have been triggered by a long GC pause 
(~5.5 seconds). Once the rebalance loop starts it _never_ recovers, no matter 
how many retries we allot.

> Kafka Consumer 0.10.0.0 killed after rebalancing exception
> --
>
> Key: KAFKA-3798
> URL: https://issues.apache.org/jira/browse/KAFKA-3798
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.0.0
> Environment: Production
>Reporter: Sahitya Agrawal
>Assignee: Neha Narkhede
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Hi , 
> I have a topic with 100 partitions and 25 consumers. Consumers were working 
> fine up to some time. 
> After some time I see kafka rebalancing exception in the logs. CPU usage is 
> also 100 % at that time. Consumer process got killed after that. 
> Kafka version : 0.10.0.0
> Some Error print from the logs are following:
> kafka.common.ConsumerRebalanceFailedException: prod_ip- can't rebalance 
> after 10 retries
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$2.run(ZookeeperConsumerConnector.scala:589)
> exception during rebalance
> org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /consumers/prod/ids/prod_ip-***
> at 
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at 
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
> at kafka.utils.ZkUtils.readData(ZkUtils.scala:542)
> at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:674)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:646)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:637)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:637)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:637)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:636)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException: 
> KeeperErrorCode = NoNode for /consumers/prod/ids/prod_ip-**
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184)
> at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:124)
> at org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:1103)
> at org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:1099)
> at 

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: 

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If 

[jira] [Created] (KAFKA-4456) Offsets of deleted topics are not removed from consumer groups

2016-11-28 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-4456:
--

 Summary: Offsets of deleted topics are not removed from consumer 
groups
 Key: KAFKA-4456
 URL: https://issues.apache.org/jira/browse/KAFKA-4456
 Project: Kafka
  Issue Type: Bug
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian


For Java consumer based consumer groups that have been consuming from a topic, 
if the topic is removed, the offset information of that topic is not removed 
from the consumer group (i.e. the latest committed offset remains intact).

[KAFKA-4095|https://issues.apache.org/jira/browse/KAFKA-4095] addresses the 
same issue for ZK based consumers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If some processor contains a state store 
> (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on 
> OS level, and when the Kafka Streams app is trying to open tasks and their 
> respective processors and state stores the {{org.rocksdb.RocksDBException: IO 
> error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process 
> is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with 
> 

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Attachment: RocksDBException_IO-error_stacktrace.txt

> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If some processor contains a state store 
> (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on 
> OS level, and when the Kafka Streams app is trying to open tasks and their 
> respective processors and state stores the {{org.rocksdb.RocksDBException: IO 
> error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process 
> is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with 
> {{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
> Streams apps were running but they had separate directories for state stores.
> h2. Stacktrace
> [^RocksDBException_IO-error_stacktrace.txt] 
> h2. Suggested solution
> To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
> lead to release of resources - in this case - filesystem LOCK files.
> h2. Possible solution code
> Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
> Commit: [BUGFIX: When commit fails during rebalance - release 
> resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
> h2. Note
> This could be related this issues: KAFKA-3708 and KAFKA-3938



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)
Davor Poldrugo created KAFKA-4455:
-

 Summary: Commit during rebalance does not close RocksDB which 
later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks 
available
 Key: KAFKA-4455
 URL: https://issues.apache.org/jira/browse/KAFKA-4455
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
 Environment: Kafka Streams were running on CentOS - I have observed 
this - after some time the locks were released even if the jvm/process wasn't 
restarted, so I guess CentOS has some lock cleaning policy.
Reporter: Davor Poldrugo


h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-11-28 Thread Soumyajit Sahu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702991#comment-15702991
 ] 

Soumyajit Sahu commented on KAFKA-1194:
---

[~abhit011], [~bradvido] Could you please give the following a try (note, 
branch = SiphonRelease): https://github.com/Microsoft/Kafka/tree/SiphonRelease
I have been running Kafka on Windows for a while now from this repo/branch.

I have temporarily shared a (scala version 2.11) build here: 
https://1drv.ms/u/s!AvtbGhqVbZGYkTFIN23sIPE6rG5a.
You will of course need to edit the server.properties and set JAVA_HOME, and 
then run the start.bat.

[~haraldk] has been helping with the validation, but another eye would help.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Assignee: Jay Kreps
>  Labels: features, patch
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-11-28 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-4454:
--

 Summary: Authorizer should also include the Principal generated by 
the PrincipalBuilder.
 Key: KAFKA-4454
 URL: https://issues.apache.org/jira/browse/KAFKA-4454
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.1
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat
 Fix For: 0.10.2.0


Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
Authorizer.
The Authorizer.authorize() object takes in a Session object that wraps 
KafkaPrincipal and InetAddress.
The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
the name of Principal generated by the PrincipalBuilder. 
This Principal, generated by the pluggedin PrincipalBuilder might have other 
fields that might be required by the pluggedin Authorizer but currently we 
loose this information since we only extract the name of Principal while 
creating KaflkaPrincipal in SocketServer.  

It would be great if KafkaPrincipal has an additional field "channelPrincipal" 
which is used to store the Principal generated by the plugged in 
PrincipalBuilder.

The pluggedin Authorizer can then use this "channelPrincipal" to do 
authorization.
 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2182: ConfigDef experimentation - support List and Ma...

2016-11-28 Thread shikhar
GitHub user shikhar opened a pull request:

https://github.com/apache/kafka/pull/2182

ConfigDef experimentation - support List and Map



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shikhar/kafka configdef-experimentation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2182


commit 39a10054606c2ae9d26d7b0625c7c59210129b09
Author: Shikhar Bhushan 
Date:   2016-11-28T19:27:21Z

ConfigDef experimentation - support List and Map




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: any plans to switch to java 8?

2016-11-28 Thread Ismael Juma
By supporting two Java versions, I mean supporting the two most recent
ones. So, we'd only drop support for Java 7 after Java 9 is released, but
no sooner (independently of how old or unsupported a particular version
is). An alternative approach is to drop support a defined amount of time
after a particular version is EOL'd.

With respect to the question about the cost of supporting multiple Java
versions: it is OK to compile with the oldest version, but we definitely
need to run the unit, integration, system and performance tests with all
supported versions. The Java team strives to maintain compatibility, but
regressions and behaviour differences are not uncommon across major
releases (and sometimes update releases). Projects like Lucene are very
good at hitting JIT bugs and they actually test against JDK EA snapshot
builds in the hope of triggering them before a stable release.

Ismael

On Mon, Nov 28, 2016 at 6:39 PM, radai  wrote:

> i dont completely understand the meaning behind supporting 2 java versions.
> given java's pretty good about backwards compatibility if you build against
> the oldest JDK you support (say 8) it should run on anything newer (say 9).
> what am i missing?
>
> On Mon, Nov 28, 2016 at 4:06 AM, Ismael Juma  wrote:
>
> > I think there are 3 main points that can be taken from that discussion
> with
> > regards to the timing:
> >
> > 1. We should do the switch no earlier than Kafka's next major version
> bump
> > (i.e. 0.11.0.0 at this point)
> > 2. Some would prefer to support two Java versions, so we'd have to wait
> > until Kafka's next major version bump _after_ Java 9 is released. Java 9
> is
> > currently scheduled to be released in July 2017. I like the guideline of
> > supporting two Java versions at a time, but multiple delays to Java 8
> and 9
> > combined with huge improvements in Java 8 could provide the basis for an
> > exception.
> > 3. Some would prefer the clients jar to support Java 7 for longer as
> there
> > are cases where it is hard to upgrade all clients to use Java 8 (maybe
> they
> > run in an older App Server that only supports Java 7, for example).
> >
> > It seems like 1 is a hard requirement while 2 and 3 are less so. Given
> > that, I was planning to restart the conversation when we have a plan to
> > bump Kafka's major version (a message format change would quality
> > typically).
> >
> > Ismael
> >
> > On Thu, Nov 10, 2016 at 7:03 PM, Joel Koshy  wrote:
> >
> > > http://markmail.org/message/gnrn5ccql7a2pmc5
> > > We can bump that up to revisit the discussion. That thread didn't have
> > any
> > > closure, but has a lot of background information.
> > >
> > > On Thu, Nov 10, 2016 at 10:37 AM, Sean McCauliff <
> > sean.mccaul...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Wait for JDK 9 which is supposed to be 4-5 months from now?
> > > >
> > > > Sean
> > > >
> > > > On Thu, Nov 10, 2016 at 10:23 AM, radai 
> > > > wrote:
> > > > > with java 7 being EOL'ed for more than a year and a half now (apr
> > 2015,
> > > > see
> > > > > http://www.oracle.com/technetwork/java/eol-135779.html) i was
> > > wondering
> > > > if
> > > > > there's an official plan/timetable for transitioning the kafka
> > codebase
> > > > > over to java 8?
> > > >
> > >
> >
>


Re: [DISCUSS] 0.10.1.1 Plan

2016-11-28 Thread Guozhang Wang
@Radai

I think Ismael's point is that people who coded their applications against
0.10.1.1 should be able to swipe in the 0.10.1.0 jar without breaking the
code (in practice maybe it is more likely the reverse scenario, that people
coded against 0.10.1.0 and trying to swipe in a newer 0.10.1.1).

@Bernard Leach

I agree that having Scala 2.12 support as early as possible than next Feb
can help the Scala community who's using Kafka. However, the downside is
that it may increase our release cycles for this bug-fix release as we need
to add more validation process to make sure that builds with Scala12 works
perfectly, while we are trying to make 0.10.1.1 out asap.


Guozhang


On Sun, Nov 27, 2016 at 5:16 PM, Bernard Leach 
wrote:

> I guess bugs are in the eye of the beholder; I’d really like to see
> KAFKA-4438 addressed as a patch release - the bug being "I can’t currently
> build any scala 2.12 projects with Kafka as a dependency".  There’s a PR
> ready to go for the branch that has already been accepted to trunk and this
> Ismael has addressed the risk issue by making the 2.12 not part of the
> default build.
>
> There are a number of downstream projects awaiting kafka scala 2.12
> binaries so they can publish their own so getting this addressed before
> February would really help push along the availability of 2.12 compatible
> libraries.
>
> cheers,
> bern.
>
> > On 27 Nov 2016, at 10:08, radai  wrote:
> >
> > "compatibility guarantees that are expected by people who subclass these
> > classes"
> >
> > sorry if this is not the best thread for this discussion, but I just
> wanted
> > to pop in and say that since any subclassing of these will obviously not
> be
> > done within the kafka codebase - what guarantees are needed?
> >
> > On Fri, Nov 25, 2016 at 11:13 PM, Michael Pearce 
> > wrote:
> >
> >> As agreed that this should be purely a bug fix release for stability.
> >>
> >>
> >> I'd like to flag then we shouldn't be adding / merging in any Jira's
> that
> >> are not bugs.
> >>
> >> e.g. KAFKA-4438
> >>
> >>
> >> 
> >> From: isma...@gmail.com  on behalf of Ismael Juma <
> >> ism...@juma.me.uk>
> >> Sent: Friday, November 25, 2016 11:43 AM
> >> To: dev@kafka.apache.org
> >> Subject: Re: [DISCUSS] 0.10.1.1 Plan
> >>
> >> Good, seems like we are in agreement about sticking to bug fixes for
> >> 0.10.1.1.
> >>
> >> Regarding the removal of final, I understand that it doesn't break
> >> backwards binary compatibility (it does break forwards compatibility and
> >> hence why it's more suited for feature releases; these are the same
> rules
> >> followed by the JDK and Scala).
> >>
> >> It's probably best to discuss this in another thread, but to clarify:
> the
> >> point I was making is that once you make a class non final, maintaining
> >> compatibility becomes more complex. You have to take into account that
> >> methods could have been overridden, for example. As such, some thought
> >> should be given to the scenarios under which classes can be extended,
> how
> >> we will ensure that we can evolve the class without breaking users,
> whether
> >> we want to make some methods final, whether we should add some
> >> documentation guiding users regarding good/bad examples of inheritance
> >> versus composition for these classes and so on. This is particularly
> >> important for core client classes exposed by the consumer and producer
> and
> >> it is the approach taken by projects that have a good track record with
> >> regards to maintaining compatibility for long periods of time. I didn't
> see
> >> much discussion or reasoning along the lines above for the
> >> ProducerRecord/ConsumerRecord change and hence my comment (even if it
> may
> >> actually me a good change once all is considered).
> >>
> >> Ismael
> >>
> >> On Thu, Nov 24, 2016 at 7:56 PM, Michael Pearce 
> >> wrote:
> >>
> >>> +1 it would be nice, and as is less restrive would not cause any issue.
> >>>
> >>> Saying that agree this is a fix build not a feature build.
> >>>
> >>> Sent using OWA for iPhone
> >>> 
> >>> From: Rajini Sivaram 
> >>> Sent: Thursday, November 24, 2016 12:17:13 PM
> >>> To: dev@kafka.apache.org
> >>> Subject: Re: [DISCUSS] 0.10.1.1 Plan
> >>>
> >>> Hi Ismael,
> >>>
> >>> OK, I do agree with you. At the moment, our code wraps these three
> >> classes
> >>> since they can't be extended. I recently noticed that two of the three
> >> are
> >>> now non-final in trunk. If all three were made non-final, we would like
> >> to
> >>> extend them,
> >>>
> >>> According to the Java specification:
> >>>
> >>> *Changing a class that is declared final to no longer be
> >>> declared final does not break compatibility with pre-existing
> binaries.*
> >>>
> >>>
> >>> So it shouldn't break anything. Perhaps 

[jira] [Commented] (KAFKA-4453) add request prioritization

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702790#comment-15702790
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2181

KAFKA-4453: add request prioritization

Today all requests (client requests, broker requests, controller requests) 
to a broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata, rejecting 
ProduceRequests and FetchRequests, and data loss (for some unofficial 
definition of data loss in terms of messages beyond the high watermark).

We'd like to minimize the number of requests processed based on stale 
state. With request prioritization, controller requests get processed before 
regular queued up requests, so requests can get processed with up-to-date state.

Request prioritization can happen at the network layer with the 
RequestChannel. The RequestChannel can categorize the request as regular or 
prioritized based on the request id. If the incoming request id matches that of 
UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest, the request 
can get prioritized.

One solution is to simply add a prioritized request queue to supplement the 
existing request queue in the RequestChannel and add request 
prioritization-aware logic to both the sendRequest and receiveRequest 
operations of RequestChannel. sendRequest puts the request into the respective 
queue based on whether the request is prioritized or not. receiveRequest can 
optimistically check the prioritized request queue and otherwise fallback to 
the regular request queue. One subtlety here is whether to do a timed poll on 
just the regular request queue or on both the prioritized request queue and 
regular request queue sequentially. Only applying the timed poll to the regular 
request queue punishes a prioritized request that arrives before a regular 
request but moments after the prioritized request check. Applying the timed 
poll to both queues sequentially results in a guaranteed latency increase on a 
regular request.

An alternative is to replace RequestChannel’s existing request queue with a 
prioritization-aware blocking queue. This approach avoids the earlier stated 
subtlety by allowing the timed poll to apply to either prioritized or regular 
requests in low-throughput scenarios while still allowing queued prioritized 
requests to go ahead of queued regular requests.

This patch goes with the latter approach to avoid punishing late arriving 
prioritized requests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4453

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2181


commit e7bff71dce307c3ad5e89f8627c63d546878e39e
Author: Onur Karaman 
Date:   2016-11-28T18:54:35Z

KAFKA-4453: add request prioritization

Today all requests (client requests, broker requests, controller requests) 
to a broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata, rejecting 
ProduceRequests and FetchRequests, and data loss (for some unofficial 
definition of data loss in terms of messages beyond the high watermark).

We'd like to minimize the number of requests processed based on stale 
state. With request prioritization, controller requests get processed before 
regular queued up requests, so requests can get processed with up-to-date state.

Request prioritization can happen at the network layer with the 
RequestChannel. The RequestChannel can categorize the request as regular or 
prioritized based on the request id. If the incoming request id matches that of 
UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest, the request 
can get prioritized.

One solution is to simply add a prioritized request queue to supplement the 
existing request queue in the RequestChannel and add request 
prioritization-aware logic to both the sendRequest and receiveRequest 
operations of RequestChannel. sendRequest puts the request into the respective 
queue based on 

[GitHub] kafka pull request #2181: KAFKA-4453: add request prioritization

2016-11-28 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2181

KAFKA-4453: add request prioritization

Today all requests (client requests, broker requests, controller requests) 
to a broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata, rejecting 
ProduceRequests and FetchRequests, and data loss (for some unofficial 
definition of data loss in terms of messages beyond the high watermark).

We'd like to minimize the number of requests processed based on stale 
state. With request prioritization, controller requests get processed before 
regular queued up requests, so requests can get processed with up-to-date state.

Request prioritization can happen at the network layer with the 
RequestChannel. The RequestChannel can categorize the request as regular or 
prioritized based on the request id. If the incoming request id matches that of 
UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest, the request 
can get prioritized.

One solution is to simply add a prioritized request queue to supplement the 
existing request queue in the RequestChannel and add request 
prioritization-aware logic to both the sendRequest and receiveRequest 
operations of RequestChannel. sendRequest puts the request into the respective 
queue based on whether the request is prioritized or not. receiveRequest can 
optimistically check the prioritized request queue and otherwise fallback to 
the regular request queue. One subtlety here is whether to do a timed poll on 
just the regular request queue or on both the prioritized request queue and 
regular request queue sequentially. Only applying the timed poll to the regular 
request queue punishes a prioritized request that arrives before a regular 
request but moments after the prioritized request check. Applying the timed 
poll to both queues sequentially results in a guaranteed latency increase on a 
regular request.

An alternative is to replace RequestChannel’s existing request queue with 
a prioritization-aware blocking queue. This approach avoids the earlier stated 
subtlety by allowing the timed poll to apply to either prioritized or regular 
requests in low-throughput scenarios while still allowing queued prioritized 
requests to go ahead of queued regular requests.

This patch goes with the latter approach to avoid punishing late arriving 
prioritized requests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4453

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2181


commit e7bff71dce307c3ad5e89f8627c63d546878e39e
Author: Onur Karaman 
Date:   2016-11-28T18:54:35Z

KAFKA-4453: add request prioritization

Today all requests (client requests, broker requests, controller requests) 
to a broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata, rejecting 
ProduceRequests and FetchRequests, and data loss (for some unofficial 
definition of data loss in terms of messages beyond the high watermark).

We'd like to minimize the number of requests processed based on stale 
state. With request prioritization, controller requests get processed before 
regular queued up requests, so requests can get processed with up-to-date state.

Request prioritization can happen at the network layer with the 
RequestChannel. The RequestChannel can categorize the request as regular or 
prioritized based on the request id. If the incoming request id matches that of 
UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest, the request 
can get prioritized.

One solution is to simply add a prioritized request queue to supplement the 
existing request queue in the RequestChannel and add request 
prioritization-aware logic to both the sendRequest and receiveRequest 
operations of RequestChannel. sendRequest puts the request into the respective 
queue based on whether the request is prioritized or not. receiveRequest can 
optimistically check the prioritized request queue and otherwise fallback to 
the regular request queue. One subtlety here is whether to do a timed poll on 
just the regular 

[jira] [Created] (KAFKA-4453) add request prioritization

2016-11-28 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4453:
---

 Summary: add request prioritization
 Key: KAFKA-4453
 URL: https://issues.apache.org/jira/browse/KAFKA-4453
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata\[1\], rejecting 
ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] 
definition of data loss in terms of messages beyond the high watermark)\[3\].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

\[1\] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partition leader, get a 
NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
topic metadata again.
\[2\] The official definition of data loss in kafka is when we lose a 
"committed" message. A message is considered "committed" when all in sync 
replicas for that partition have applied it to their log.
\[3\] Say a number of ProduceRequests are sitting infront of a controller's 
LeaderAndIsrRequest on a given broker's request queue. Suppose the 
ProduceRequests are for partitions whose leadership has recently shifted out 
from the current broker to another broker in the replica set. Today the broker 
processes the ProduceRequests before the LeaderAndIsrRequest, meaning the 
ProduceRequests are getting processed on the former partition leader. As part 
of becoming a follower for a partition, the broker truncates the log to the 
high-watermark. With weaker ack settings such as acks=1, the leader may 
successfully write to its own log, respond to the user with a success, process 
the LeaderAndIsrRequest making the broker a follower of the partition, and 
truncate the log to a point before the user's produced messages. So users have 
a false sense that their produce attempt succeeded while in reality their 
messages got erased. While technically part of what they signed up for with 
acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-95: Incremental Batch Processing for Kafka Streams

2016-11-28 Thread Matthias J. Sax
Hi all,

I want to start a discussion about KIP-95:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams

Looking forward to your feedback.


-Matthias




signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-93: Improve invalid timestamp handling in Kafka Streams

2016-11-28 Thread Matthias J. Sax
Done.

If there is no further comments, I would like to start a voting thread
in a separate email.

-Matthias

On 11/28/16 9:08 AM, Guozhang Wang wrote:
> Yes it does not include these, again in my previous previous email I meant
> when you say "This is a breaking, incompatible change" people may interpret
> it differently. So better explain it more clearly.
> 
> 
> Guozhang
> 
> On Thu, Nov 24, 2016 at 10:31 PM, Matthias J. Sax 
> wrote:
> 
>> That does make sense. But KIP-93 does not change anything like this, so
>> there is nothing to mention, IMHO.
>>
>> Or do you mean, the KIP should include that the change is backward
>> compatible with this regard?
>>
>> -Matthias
>>
>>
>>
>> On 11/24/16 5:31 PM, Guozhang Wang wrote:
>>> What I meant is that, for some changes (e.g. say we change the
>>> auto-repartition behavior that caused using different name conventions,
>> or
>>> some changes that involve changing the underlying state store names, etc)
>>> the existing internal state including the stores and topics will probably
>>> not valid. Some users consider this also as a "backward incompatible
>>> change" since they cannot just swipe in the new jar and restart.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Wed, Nov 23, 2016 at 3:20 PM, Matthias J. Sax 
>>> wrote:
>>>
 Thanks for the feedback. I updated the KIP for (1) and (2).

 However not for (3): Why should it be required to reset an application?
 If user processed "good" data with valid timestamps, behavior does not
 change. If user tried to process "bad" data with invalid timestamps, the
 application does fail currently anyway, so there is nothing to reset.


 -Matthias

 On 11/22/16 9:53 AM, Guozhang Wang wrote:
> Regarding the "compatibility" section, I would suggest being a bit more
> specific about why it is a breaking change. For Streams, it could mean
> different things:
>
> 1. User need code change when switching library dependency on the new
> version, otherwise it won't compile(I think this is the case for this
 KIP).
> 2. User need code change when switching library dependency on the new
> version, otherwise runtime exception will be thrown.
> 3. Existing application state as well as internal topics need to be
 swiped
> and the program need to restart from zero.
>
>
> Guozhang
>
> On Fri, Nov 18, 2016 at 12:27 PM, Matthias J. Sax <
>> matth...@confluent.io
>
> wrote:
>
>> Hi all,
>>
>> I want to start a discussion about KIP-93:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 93%3A+Improve+invalid+timestamp+handling+in+Kafka+Streams
>>
>> Looking forward to your feedback.
>>
>>
>> -Matthias
>>
>>
>
>


>>>
>>>
>>
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: any plans to switch to java 8?

2016-11-28 Thread radai
i dont completely understand the meaning behind supporting 2 java versions.
given java's pretty good about backwards compatibility if you build against
the oldest JDK you support (say 8) it should run on anything newer (say 9).
what am i missing?

On Mon, Nov 28, 2016 at 4:06 AM, Ismael Juma  wrote:

> I think there are 3 main points that can be taken from that discussion with
> regards to the timing:
>
> 1. We should do the switch no earlier than Kafka's next major version bump
> (i.e. 0.11.0.0 at this point)
> 2. Some would prefer to support two Java versions, so we'd have to wait
> until Kafka's next major version bump _after_ Java 9 is released. Java 9 is
> currently scheduled to be released in July 2017. I like the guideline of
> supporting two Java versions at a time, but multiple delays to Java 8 and 9
> combined with huge improvements in Java 8 could provide the basis for an
> exception.
> 3. Some would prefer the clients jar to support Java 7 for longer as there
> are cases where it is hard to upgrade all clients to use Java 8 (maybe they
> run in an older App Server that only supports Java 7, for example).
>
> It seems like 1 is a hard requirement while 2 and 3 are less so. Given
> that, I was planning to restart the conversation when we have a plan to
> bump Kafka's major version (a message format change would quality
> typically).
>
> Ismael
>
> On Thu, Nov 10, 2016 at 7:03 PM, Joel Koshy  wrote:
>
> > http://markmail.org/message/gnrn5ccql7a2pmc5
> > We can bump that up to revisit the discussion. That thread didn't have
> any
> > closure, but has a lot of background information.
> >
> > On Thu, Nov 10, 2016 at 10:37 AM, Sean McCauliff <
> sean.mccaul...@gmail.com
> > >
> > wrote:
> >
> > > Wait for JDK 9 which is supposed to be 4-5 months from now?
> > >
> > > Sean
> > >
> > > On Thu, Nov 10, 2016 at 10:23 AM, radai 
> > > wrote:
> > > > with java 7 being EOL'ed for more than a year and a half now (apr
> 2015,
> > > see
> > > > http://www.oracle.com/technetwork/java/eol-135779.html) i was
> > wondering
> > > if
> > > > there's an official plan/timetable for transitioning the kafka
> codebase
> > > > over to java 8?
> > >
> >
>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-28 Thread radai
will do (only added a single one so far, the rest TBD)

On Mon, Nov 28, 2016 at 10:04 AM, Jun Rao  wrote:

> Hi, Radai,
>
> Could you add a high level description of the newly added metrics to the
> KIP wiki?
>
> Thanks,
>
> Jun
>
> On Wed, Nov 23, 2016 at 3:45 PM, radai  wrote:
>
> > Hi Jun,
> >
> > I've added the sensor you requested (or at least I think I did )
> >
> > On Fri, Nov 18, 2016 at 12:37 PM, Jun Rao  wrote:
> >
> > > KafkaRequestHandlerPool
> >
>


[jira] [Updated] (KAFKA-4452) Server segfault in ContiguousSpace::block_size

2016-11-28 Thread Grant Rodgers (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Rodgers updated KAFKA-4452:
-
Environment: 
JVM: java version "1.8.0_31"
Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode); OS: 
3.13.0-51-generic #84~precise1-Ubuntu SMP Wed Apr 15 21:45:46 UTC 2015 x86_64 
x86_64 x86_64 GNU/Linux

  was:3.13.0-51-generic #84~precise1-Ubuntu SMP Wed Apr 15 21:45:46 UTC 2015 
x86_64 x86_64 x86_64 GNU/Linux

Description: 
Kafka server received SIGABRT and emitted this segfault message:

#
# A fatal error has been detected by the Java Runtime Environment:
[thread 139972074120960 also had an error]
#
#  SIGSEGV (0xb) at pc=0x7f4dcc71e864, pid=5938, tid=139972072015616
#
# JRE version: Java(TM) SE Runtime Environment (8.0_31-b13) (build 1.8.0_31-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode linux-amd64 
compresse
d oops)
# Problematic frame:
# V  [libjvm.so+0x9a4864]  ContiguousSpace::block_size(HeapWord const*) 
const+0x34

> Server segfault in ContiguousSpace::block_size
> --
>
> Key: KAFKA-4452
> URL: https://issues.apache.org/jira/browse/KAFKA-4452
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
> Environment: JVM: java version "1.8.0_31"
> Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode); OS: 
> 3.13.0-51-generic #84~precise1-Ubuntu SMP Wed Apr 15 21:45:46 UTC 2015 x86_64 
> x86_64 x86_64 GNU/Linux
>Reporter: Grant Rodgers
>Priority: Critical
>
> Kafka server received SIGABRT and emitted this segfault message:
> #
> # A fatal error has been detected by the Java Runtime Environment:
> [thread 139972074120960 also had an error]
> #
> #  SIGSEGV (0xb) at pc=0x7f4dcc71e864, pid=5938, tid=139972072015616
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_31-b13) (build 
> 1.8.0_31-b13)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode 
> linux-amd64 compresse
> d oops)
> # Problematic frame:
> # V  [libjvm.so+0x9a4864]  ContiguousSpace::block_size(HeapWord const*) 
> const+0x34



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4452) Server segfault in ContiguousSpace::block_size

2016-11-28 Thread Grant Rodgers (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Rodgers updated KAFKA-4452:
-
Description: 
Kafka server received SIGABRT and emitted this segfault message:

{noformat}
#
# A fatal error has been detected by the Java Runtime Environment:
[thread 139972074120960 also had an error]
#
#  SIGSEGV (0xb) at pc=0x7f4dcc71e864, pid=5938, tid=139972072015616
#
# JRE version: Java(TM) SE Runtime Environment (8.0_31-b13) (build 1.8.0_31-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode linux-amd64 
compresse
d oops)
# Problematic frame:
# V  [libjvm.so+0x9a4864]  ContiguousSpace::block_size(HeapWord const*) 
const+0x34
{noformat}

  was:
Kafka server received SIGABRT and emitted this segfault message:

#
# A fatal error has been detected by the Java Runtime Environment:
[thread 139972074120960 also had an error]
#
#  SIGSEGV (0xb) at pc=0x7f4dcc71e864, pid=5938, tid=139972072015616
#
# JRE version: Java(TM) SE Runtime Environment (8.0_31-b13) (build 1.8.0_31-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode linux-amd64 
compresse
d oops)
# Problematic frame:
# V  [libjvm.so+0x9a4864]  ContiguousSpace::block_size(HeapWord const*) 
const+0x34


> Server segfault in ContiguousSpace::block_size
> --
>
> Key: KAFKA-4452
> URL: https://issues.apache.org/jira/browse/KAFKA-4452
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
> Environment: JVM: java version "1.8.0_31"
> Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode); OS: 
> 3.13.0-51-generic #84~precise1-Ubuntu SMP Wed Apr 15 21:45:46 UTC 2015 x86_64 
> x86_64 x86_64 GNU/Linux
>Reporter: Grant Rodgers
>Priority: Critical
>
> Kafka server received SIGABRT and emitted this segfault message:
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> [thread 139972074120960 also had an error]
> #
> #  SIGSEGV (0xb) at pc=0x7f4dcc71e864, pid=5938, tid=139972072015616
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_31-b13) (build 
> 1.8.0_31-b13)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode 
> linux-amd64 compresse
> d oops)
> # Problematic frame:
> # V  [libjvm.so+0x9a4864]  ContiguousSpace::block_size(HeapWord const*) 
> const+0x34
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4452) Server segfault in ContiguousSpace::block_size

2016-11-28 Thread Grant Rodgers (JIRA)
Grant Rodgers created KAFKA-4452:


 Summary: Server segfault in ContiguousSpace::block_size
 Key: KAFKA-4452
 URL: https://issues.apache.org/jira/browse/KAFKA-4452
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.0
 Environment: 3.13.0-51-generic #84~precise1-Ubuntu SMP Wed Apr 15 
21:45:46 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Grant Rodgers
Priority: Critical






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-11-28 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-4451:
--
Fix Version/s: (was: 0.9.0.1)

> Recovering empty replica yields negative offsets in index of compact 
> partitions
> ---
>
> Key: KAFKA-4451
> URL: https://issues.apache.org/jira/browse/KAFKA-4451
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Michael Schiff
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index | head -n 10
> Dumping /kafka/attainment_event-0/.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-11-28 Thread Michael Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-4451:
--
Affects Version/s: 0.9.0.1

> Recovering empty replica yields negative offsets in index of compact 
> partitions
> ---
>
> Key: KAFKA-4451
> URL: https://issues.apache.org/jira/browse/KAFKA-4451
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Michael Schiff
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index | head -n 10
> Dumping /kafka/attainment_event-0/.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-28 Thread Jun Rao
Hi, Radai,

Could you add a high level description of the newly added metrics to the
KIP wiki?

Thanks,

Jun

On Wed, Nov 23, 2016 at 3:45 PM, radai  wrote:

> Hi Jun,
>
> I've added the sensor you requested (or at least I think I did )
>
> On Fri, Nov 18, 2016 at 12:37 PM, Jun Rao  wrote:
>
> > KafkaRequestHandlerPool
>


[jira] [Commented] (KAFKA-4439) Add a builder to NetworkClient

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702638#comment-15702638
 ] 

ASF GitHub Bot commented on KAFKA-4439:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2180

KAFKA-4439: NetworkClient: create a builder class to encapsulate ctor 
arguments



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4439

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2180


commit b501ad097e41f6b99ffa47ccbaf4f7fbbcbc9df8
Author: Colin P. Mccabe 
Date:   2016-11-24T01:31:18Z

KAFKA-4439: NetworkClient: create a builder class to encapsulate the many 
arguments to the constructor




> Add a builder to NetworkClient
> --
>
> Key: KAFKA-4439
> URL: https://issues.apache.org/jira/browse/KAFKA-4439
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Reporter: Colin P. McCabe
>Priority: Minor
>
> NetworkClient's constructors have too many parameters.  This makes it hard to 
> follow what each value is being initialized to.  Instead, let's use the 
> builder pattern to clearly identify what each parameter is and provide 
> sensible defaults for each.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702635#comment-15702635
 ] 

Guozhang Wang commented on KAFKA-4447:
--

My two cents here:

1. ZkClient's causing our listener function to be executed in different threads 
and hence we have seen lots of race conditions in the controller. I believe 
[~onurkaraman] [~becket_qin] have been working on re-writing the controller to 
fix such multi-threading race conditions as a whole. And after that such issues 
should be fixed. Before that happens I think a simple check as Jason mentioned 
before may not be sufficient, since it could happen that when the listener 
thread does the check it is still not resigned, but while it is executing the 
resignation happens. I think it does not do effective harm as its requests to 
other brokers should be rejected because of the obsoleted epoch number, but the 
3 minutes log swamp could be irritating.

2. The partition assignment could take long time with large number of 
partitions to migrate, and particularly in this case it lasts 3 minutes acting 
as the controller even after it has resigned because of thread racing. There 
are already some optimizations submitted by [~lindong] to shorten this latency, 
but I think we also need to consider how to handle such "long task" under a 
single-threaded model.

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2180: KAFKA-4439: NetworkClient: create a builder class ...

2016-11-28 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2180

KAFKA-4439: NetworkClient: create a builder class to encapsulate ctor 
arguments



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4439

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2180


commit b501ad097e41f6b99ffa47ccbaf4f7fbbcbc9df8
Author: Colin P. Mccabe 
Date:   2016-11-24T01:31:18Z

KAFKA-4439: NetworkClient: create a builder class to encapsulate the many 
arguments to the constructor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-93: Improve invalid timestamp handling in Kafka Streams

2016-11-28 Thread Guozhang Wang
Yes it does not include these, again in my previous previous email I meant
when you say "This is a breaking, incompatible change" people may interpret
it differently. So better explain it more clearly.


Guozhang

On Thu, Nov 24, 2016 at 10:31 PM, Matthias J. Sax 
wrote:

> That does make sense. But KIP-93 does not change anything like this, so
> there is nothing to mention, IMHO.
>
> Or do you mean, the KIP should include that the change is backward
> compatible with this regard?
>
> -Matthias
>
>
>
> On 11/24/16 5:31 PM, Guozhang Wang wrote:
> > What I meant is that, for some changes (e.g. say we change the
> > auto-repartition behavior that caused using different name conventions,
> or
> > some changes that involve changing the underlying state store names, etc)
> > the existing internal state including the stores and topics will probably
> > not valid. Some users consider this also as a "backward incompatible
> > change" since they cannot just swipe in the new jar and restart.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Nov 23, 2016 at 3:20 PM, Matthias J. Sax 
> > wrote:
> >
> >> Thanks for the feedback. I updated the KIP for (1) and (2).
> >>
> >> However not for (3): Why should it be required to reset an application?
> >> If user processed "good" data with valid timestamps, behavior does not
> >> change. If user tried to process "bad" data with invalid timestamps, the
> >> application does fail currently anyway, so there is nothing to reset.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/22/16 9:53 AM, Guozhang Wang wrote:
> >>> Regarding the "compatibility" section, I would suggest being a bit more
> >>> specific about why it is a breaking change. For Streams, it could mean
> >>> different things:
> >>>
> >>> 1. User need code change when switching library dependency on the new
> >>> version, otherwise it won't compile(I think this is the case for this
> >> KIP).
> >>> 2. User need code change when switching library dependency on the new
> >>> version, otherwise runtime exception will be thrown.
> >>> 3. Existing application state as well as internal topics need to be
> >> swiped
> >>> and the program need to restart from zero.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Fri, Nov 18, 2016 at 12:27 PM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
>  Hi all,
> 
>  I want to start a discussion about KIP-93:
> 
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  93%3A+Improve+invalid+timestamp+handling+in+Kafka+Streams
> 
>  Looking forward to your feedback.
> 
> 
>  -Matthias
> 
> 
> >>>
> >>>
> >>
> >>
> >
> >
>
>


-- 
-- Guozhang


Re: How to commit bug fix to kafka mesos framework

2016-11-28 Thread Apurva Mehta
If the bug is in Kafka, here is the process for submitting a fix:
http://kafka.apache.org/contributing

If the bug is in the mesos-kafka framework, I think you should look up that
project and find out how to commit a bugfix there. I think it should not be
more complicated than submitting a PR.

Thanks,
Apurva

On Mon, Nov 28, 2016 at 7:00 AM, Yu Wei  wrote:

> Hi Guys,
>
> I found a bug in kafka mesos framework related with support for kafka
> 0.9.x and later.
>
> Now I have a solution to fix the problem.
>
>
> So how could I commit the code to the project? Is there any process about
> it?
>
> And who could help me?
>
>
> Thanks,
>
> Jared, (??)
> Software developer
> Interested in open source software, cloud computing, big data, Linux
>


Re: A strange controller log in Kafka 0.9.0.1

2016-11-28 Thread Json Tu
thanks to Jason Gustafson, hope more contributor can take part in this 
discussion.
https://issues.apache.org/jira/browse/KAFKA-4447 


> 在 2016年11月27日,下午9:20,Json Tu  写道:
> 
> AnyBody?This is very disconcerting! If convenient, Can somebody help to 
> confirm this strange question.
> 
>> 在 2016年11月26日,上午1:35,Json Tu  写道:
>> 
>> thanks guozhang,
>>  if it's convenient,can we disscuss it in the jira 
>> https://issues.apache.org/jira/browse/KAFKA-4447 
>> ,I guess some body may 
>> also encounter this problem.
>> 
>>> 在 2016年11月25日,下午12:31,Guozhang Wang  写道:
>>> 
>>> Does broker 100 keeps acting as the controller afterwards? What you observe
>>> is possible and should be transient since "unsubscribeChildChanges" on
>>> ZkClient and listener fired procedure are executed on different threads and
>>> they are not strictly synchronized. But if you continuously see broker
>>> 100's listener fires and it acts like a controller then there may be an
>>> issue with 0.9.0.1 version.
>>> 
>>> Guozhang
>>> 
>>> On Wed, Nov 23, 2016 at 7:28 AM, Json Tu  wrote:
>>> 
 Hi,
  We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a
 strange controller log as below.
 
 [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK
 expired; shut down all controller components and try to re-elect
 (kafka.controller.KafkaController$SessionExpirationListener)
 [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning,
 broker id 100 (kafka.controller.KafkaController)
 [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering
 IsrChangeNotificationListener (kafka.controller.KafkaController)
 [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down
 (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
 [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped
 (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
 [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown
 completed (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
 [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller
 100]: Stopped partition state machine (kafka.controller.
 PartitionStateMachine)
 [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]:
 Stopped replica state machine (kafka.controller.ReplicaStateMachine)
 [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread],
 Shutting down (kafka.controller.RequestSendThread)
 [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
 Stopped  (kafka.controller.RequestSendThread)
 [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
 Shutdown completed (kafka.controller.RequestSendThread)
 [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread],
 Shutting down (kafka.controller.RequestSendThread)
 [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
 Stopped  (kafka.controller.RequestSendThread)
 [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
 Shutdown completed (kafka.controller.RequestSendThread)
 [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as
 the controller (kafka.controller.KafkaController)
 [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!!
 (kafka.controller.IsrChangeNotificationListener)
 [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]:
 Broker change listener fired for path /brokers/ids with children 101,100
 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
 [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete
 topics listener fired for topics  to be deleted (kafka.controller.
 PartitionStateMachine$DeleteTopicsListener)
 [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add
 Partition triggered {"version":1,"partitions":{"4"
 :[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
 for path /brokers/topics/movie.gateway.merselllog.syncCinema
 (kafka.controller.PartitionStateMachine$AddPartitionsListener)
 [2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add
 Partition triggered {"version":1,"partitions":{"4"
 :[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
 for path /brokers/topics/push_3rdparty_high (kafka.controller.
 PartitionStateMachine$AddPartitionsListener)
 [2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add
 Partition triggered {"version":1,"partitions":{"4"
 :[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2":[102,101],"3":[100,101]}}
 

[jira] [Commented] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702465#comment-15702465
 ] 

Json Tu commented on KAFKA-4447:


may be the phenomenon of these listener is the same as one zk's callback 
executing is very very slow,and it leads to many back fired listener can only 
be executed after a long time.

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702384#comment-15702384
 ] 

Json Tu edited comment on KAFKA-4447 at 11/28/16 4:45 PM:
--

[~skarface] thanks for your reply.
the latest kafka's release version is 0.10.1.0,and kafka controller 's 
handleNewSession() is implemented as below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the 
controllerlock. the lock is out of the onControllerResignation(). and this is a 
bug which was reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  because we are reassign 
partitions at that time. so may be we can image it as below.
there are many callbacks(for example 40 callbacks, such as 
IsrChangeNotificationListener) to process before to process zk's expired 
callback, so before startting to process zk expired callback, there will be 
have a lot of time to wait more listeners to be fired(for example 100 
callbacks, also include IsrChangeNotificationListener) . 

as we know,the zkclient callback thread is single thread,so the listener 
fired after zk's expired callback will and only be executed after 
handleNewSession().

may be this is make sense.


was (Author: json tu):
[~skarface] thanks for your reply.
the latest kafka's release version is 0.10.1.0,and kafka controller 's 
handleNewSession() is implemented as below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  because we are reassign 
partitions at that time. so may be we can image it as below.
there are many callbacks(for example 40 callbacks, such as 
IsrChangeNotificationListener) to process before to process zk's expired 
callback, so before startting to process zk expired callback, there will be 
have a lot of time to wait more listeners to be fired(for example 100 
callbacks, also include IsrChangeNotificationListener) . 
as we know,the zkclient callback thread is single thread,so the listener fired 
after zk's expired callback will and only be executed after handleNewSession().
may be this is make sense.

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702384#comment-15702384
 ] 

Json Tu edited comment on KAFKA-4447 at 11/28/16 4:44 PM:
--

[~skarface] thanks for your reply.
the latest kafka's release version is 0.10.1.0,and kafka controller 's 
handleNewSession() is implemented as below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  because we are reassign 
partitions at that time. so may be we can image it as below.
there are many callbacks(for example 40 callbacks, such as 
IsrChangeNotificationListener) to process before to process zk's expired 
callback, so before startting to process zk expired callback, there will be 
have a lot of time to wait more listeners to be fired(for example 100 
callbacks, also include IsrChangeNotificationListener) . 
as we know,the zkclient callback thread is single thread,so the listener fired 
after zk's expired callback will and only be executed after handleNewSession().
may be this is make sense.


was (Author: json tu):
[~skarface] thanks for your reply.
the latest kafka's release version is 0.10.1.0,and kafka controller 's 
handleNewSession() is implemented as below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  may be we can image it as below.
1. ZK expired callback queue is fired. and he get controllerLock first. then 
start to execute onControllerResignation .
2. at that time IsrChangeNotificationListener、PartitionsReassignedListener and 
so on are all fired very compact. 
3. then the onControllerResignation() start to exectue  de-register listeners.

as we know,the zkclient callback thread is single thread,so the listener fired 
after zk expired only can be executed after handleNewSession(),
may be this is make sense.

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2179: MINOR: Fix typos in KafkaConsumer docs

2016-11-28 Thread jeffwidman
GitHub user jeffwidman opened a pull request:

https://github.com/apache/kafka/pull/2179

MINOR: Fix typos in KafkaConsumer docs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeffwidman/kafka patch-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2179


commit 12b3cb313f3b69225ae49b5545b2b5828d1666ed
Author: Jeff Widman 
Date:   2016-11-28T16:31:05Z

MINOR: Fix typos in KafkaConsumer docs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Issue Comment Deleted] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Json Tu updated KAFKA-4447:
---
Comment: was deleted

(was: [~skarface] thanks for your reply.
the latest release version 0.10.1.0,handleNewSession()'s implemention is as 
below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  so we can image it as below.
1. ZK expired callback queue is fired. and he get controllerLock first. then 
start to execute onControllerResignation .
2. at that time IsrChangeNotificationListener、PartitionsReassignedListener and 
so on are all fired very compact. 
3. then the onControllerResignation() start to exectue  de-register listeners.

as we know,the zkclient callback thread is single thread,so the listener fired 
after zk expired only can be executed after handleNewSession(),
may be this is make sense.)

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702384#comment-15702384
 ] 

Json Tu edited comment on KAFKA-4447 at 11/28/16 4:26 PM:
--

[~skarface] thanks for your reply.
the latest kafka's release version is 0.10.1.0,and kafka controller 's 
handleNewSession() is implemented as below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  may be we can image it as below.
1. ZK expired callback queue is fired. and he get controllerLock first. then 
start to execute onControllerResignation .
2. at that time IsrChangeNotificationListener、PartitionsReassignedListener and 
so on are all fired very compact. 
3. then the onControllerResignation() start to exectue  de-register listeners.

as we know,the zkclient callback thread is single thread,so the listener fired 
after zk expired only can be executed after handleNewSession(),
may be this is make sense.


was (Author: json tu):
[~skarface] thanks for your reply.
the latest release version 0.10.1.0,handleNewSession()'s implemention is as 
below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  so we can image it as below.
1. ZK expired callback queue is fired. and he get controllerLock first. then 
start to execute onControllerResignation .
2. at that time IsrChangeNotificationListener、PartitionsReassignedListener and 
so on are all fired very compact. 
3. then the onControllerResignation() start to exectue  de-register listeners.

as we know,the zkclient callback thread is single thread,so the listener fired 
after zk expired only can be executed after handleNewSession(),
may be this is make sense.

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702383#comment-15702383
 ] 

Json Tu commented on KAFKA-4447:


[~skarface] thanks for your reply.
the latest release version 0.10.1.0,handleNewSession()'s implemention is as 
below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  so we can image it as below.
1. ZK expired callback queue is fired. and he get controllerLock first. then 
start to execute onControllerResignation .
2. at that time IsrChangeNotificationListener、PartitionsReassignedListener and 
so on are all fired very compact. 
3. then the onControllerResignation() start to exectue  de-register listeners.

as we know,the zkclient callback thread is single thread,so the listener fired 
after zk expired only can be executed after handleNewSession(),
may be this is make sense.

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4447) Controller resigned but it also acts as a controller for a long time

2016-11-28 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702384#comment-15702384
 ] 

Json Tu commented on KAFKA-4447:


[~skarface] thanks for your reply.
the latest release version 0.10.1.0,handleNewSession()'s implemention is as 
below,
  def handleNewSession() {
  info("ZK expired; shut down all controller components and try to 
re-elect")
  inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
  }
}

so deregisterIsrChangeNotificationListener() is also with the controllerlock. 
the lock is out of the onControllerResignation(). and this is a bug which was 
reported at https://issues.apache.org/jira/browse/KAFKA-4360.

my version is 0.9.0.1, so it is not bugfixed,  so we can image it as below.
1. ZK expired callback queue is fired. and he get controllerLock first. then 
start to execute onControllerResignation .
2. at that time IsrChangeNotificationListener、PartitionsReassignedListener and 
so on are all fired very compact. 
3. then the onControllerResignation() start to exectue  de-register listeners.

as we know,the zkclient callback thread is single thread,so the listener fired 
after zk expired only can be executed after handleNewSession(),
may be this is make sense.

> Controller resigned but it also acts as a controller for a long time 
> -
>
> Key: KAFKA-4447
> URL: https://issues.apache.org/jira/browse/KAFKA-4447
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Linux Os
>Reporter: Json Tu
> Attachments: log.tar.gz
>
>
> We have a cluster with 10 nodes,and we execute following operation as below.
> 1.we execute some topic partition reassign from one node to other 9 nodes in 
> the cluster, and which triggered controller.
> 2.controller invoke PartitionsReassignedListener's handleDataChange and read 
> all partition reassign rules from the zk path, and executed all 
> onPartitionReassignment for all partition that match conditions.
> 3.but the controller is expired from zk, after what some nodes of 9 nodes 
> also expired from zk.
> 5.then controller invoke onControllerResignation to resigned as the 
> controller.
> we found after the controller is resigned, it acts as controller for about 3 
> minutes, which can be found in my attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2178: Minor: Fix typos in KafkaConsumer docs

2016-11-28 Thread jeffwidman
GitHub user jeffwidman opened a pull request:

https://github.com/apache/kafka/pull/2178

Minor: Fix typos in KafkaConsumer docs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeffwidman/kafka patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2178


commit bba9148fe2d058ccab0f33fac3c5f65103f0af2e
Author: Jeff Widman 
Date:   2016-11-28T16:22:25Z

Minor: Fix typos in KafkaConsumer docs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4434) KafkaProducer configuration is logged twice

2016-11-28 Thread kevin.chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702321#comment-15702321
 ] 

kevin.chen commented on KAFKA-4434:
---

can any one assign this to me? or add me to contributor for kafka project

> KafkaProducer configuration is logged twice
> ---
>
> Key: KAFKA-4434
> URL: https://issues.apache.org/jira/browse/KAFKA-4434
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 0.10.0.1
>Reporter: Ruben de Gooijer
>Priority: Minor
>  Labels: newbie
> Fix For: 0.10.2.0
>
>
> The constructor of org.apache.kafka.clients.producer.KafkaProducer accepts a 
> ProducerConfig which when constructed logs the configuration: 
> https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L58
>  . 
> However, when the construction of KafkaProducer proceeds the provided 
> ProducerConfig is repurposed and another instance is created 
> https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L323
>  which triggers another log with the same contents (only the clientId can 
> differ in case its not supplied in the original config). 
> At first sight this seems like unintended behaviour to me. At least it caused 
> me to dive into it in order to verify if there weren't two producer instances 
> running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3715) Higher granularity streams metrics

2016-11-28 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702299#comment-15702299
 ] 

Eno Thereska commented on KAFKA-3715:
-

[~aartigupta] would it be ok if I take a pass or would you be interested in 
continuing this?

> Higher granularity streams metrics 
> ---
>
> Key: KAFKA-3715
> URL: https://issues.apache.org/jira/browse/KAFKA-3715
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: aarti gupta
>Priority: Minor
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Originally proposed by [~guozhang] in 
> https://github.com/apache/kafka/pull/1362#issuecomment-218326690
> We can consider adding metrics for process / punctuate / commit rate at the 
> granularity of each processor node in addition to the global rate mentioned 
> above. This is very helpful in debugging.
> We can consider adding rate / total cumulated metrics for context.forward 
> indicating how many records were forwarded downstream from this processor 
> node as well. This is helpful in debugging.
> We can consider adding metrics for each stream partition's timestamp. This is 
> helpful in debugging.
> Besides the latency metrics, we can also add throughput latency in terms of 
> source records consumed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-84: Support SASL SCRAM mechanisms

2016-11-28 Thread Rajini Sivaram
Hi Ismael,

Thank you for the review.

1. I think you had asked earlier for SCRAM-SHA-1 to be removed since it is
not secure :-) I am happy to add that back in so that clients which don't
have access to a more secure algorithm can use it. But it would be a shame
to prevent users who only need Java clients from using more secure
mechanisms. Since SHA-1 is not secure, you need a secure Zookeeper
installation (or store your credentials in an alternative secure store)..
By supporting multiple algorithms, we are giving the choice to users. It
doesn't add much additional code, just the additional tests (one
integration test per mechanism). As more clients support new mechanisms,
users can enable these without any changes to Kafka.

2. OK, since it has come up multiple times, I will update the KIP to use a
more verbose format.

3. I am assuming that ZK authentication will be enabled and ZK
configuration will be done directly using ZK commands. This is true for
ACLs, quotas etc. as well?



On Mon, Nov 28, 2016 at 2:04 PM, Ismael Juma  wrote:

> Hi Rajini,
>
> Thanks for the KIP. I am in favour of introducing SCRAM as an additional
> SASL mechanism. A few comments:
>
> 1. Magnus raised the point that cyrus-sasl currently only
> implements SCRAM-SHA-1, so having a larger number of variants will involve
> more work for non-Java clients. Do we really need all of SCRAM-SHA-224,
> SCRAM-SHA-256, SCRAM-SHA-384 and SCRAM-SHA-512?
>
> 2. Like Radai mentioned, it seems a bit weird to use single letter keys.
> The space savings don't seem particularly useful given the size of the
> values. You mentioned that it would not be useful for humans to interpret
> them, but we do expose them via the configs tool. Since we have to add more
> keys, I am also not convinced about the benefit of reusing the same letters
> as for SCRAM messages.
>
> 3. Have we given any thought on whether the znodes storing the
> authentication information should have additional access restrictions (even
> though they are hashed and salted)? Is the assumption that people would
> enable ZK authentication?
>
> Ismael
>
> On Tue, Nov 15, 2016 at 7:25 PM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Radai,
> >
> > I don't have a strong objection to using a more verbose format. But the
> > reasons for choosing the cryptic s=,t=,... format:
> >
> >1. Unlike other properties like quotas stored in Zookeeper which need
> to
> >be human readable in order to query the values, these values only need
> > to
> >be parsed by code. The base64-encoded array of bytes don't really mean
> >anything to a human. You would only ever want to check if the user has
> >credentials for a mechanism, you can't really tell what the
> credentials
> > are
> >from the value stored in ZK.
> >2. Single letter keys save space. Agree, it is not much, but since a
> >more verbose format doesn't add much value, it feels like wasted space
> > in
> >ZK to store long key names for each property for each user for each
> >mechanism.
> >3. SCRAM authentication messages defined in RFC 5802
> > use comma-separated key=value
> >pairs with single letter keys. s= and i= appear
> > exactly
> >like that in SCRAM messages. Server key and stored key are not
> > exchanged,
> >so I chose two unused letters. The same parser used for SCRAM messages
> > is
> >used to parse this persisted value as well since the format is the
> same.
> >
> >
> > On Tue, Nov 15, 2016 at 5:02 PM, radai 
> wrote:
> >
> > > small nitpick - given that s,t,k and i are used as part of a rather
> large
> > > CSV format, what is the gain in having them be single letter aliases?
> > > in other words - why not salt=... , serverKey=... , storedKey=... ,
> > > iterations=... ?
> > >
> > > On Tue, Nov 15, 2016 at 7:26 AM, Mickael Maison <
> > mickael.mai...@gmail.com>
> > > wrote:
> > >
> > > > +1
> > > >
> > > > On Tue, Nov 15, 2016 at 10:57 AM, Rajini Sivaram
> > > >  wrote:
> > > > > Jun,
> > > > >
> > > > > Thank you, I have made the updates to the KIP.
> > > > >
> > > > > On Tue, Nov 15, 2016 at 12:34 AM, Jun Rao 
> wrote:
> > > > >
> > > > >> Hi, Rajini,
> > > > >>
> > > > >> Thanks for the proposal. +1. A few minor comments.
> > > > >>
> > > > >> 30. Could you add that the broker config sasl.enabled.mechanisms
> can
> > > now
> > > > >> take more values?
> > > > >>
> > > > >> 31. Could you document the meaning of s,t,k,i used in
> > > > /config/users/alice
> > > > >> in ZK?
> > > > >>
> > > > >> 32. In the rejected section, could you document why we decided not
> > to
> > > > bump
> > > > >> up the version of SaslHandshakeRequest?
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >>
> > > > >> On Mon, Nov 14, 2016 at 5:57 AM, Rajini Sivaram <
> > > > >> rajinisiva...@googlemail.com> wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> 

Re: [VOTE] KIP-85: Dynamic JAAS configuration for Kafka clients

2016-11-28 Thread Rajini Sivaram
Ismael,

Thank you for reviewing the KIP. I do agree that JAAS config format is not
ideal. But I wanted to solve the generic configuration issue (need for
physical file, single static config) for any SASL mechanism in an
extensible, future-proof way. And that requires the ability to configure
all the properties currently configured using the JAAS config file - login
module and all its options. It didn't make sense to define a new format to
do this when JAAS is supported by Kafka.

Kerberos is a very special case. Unlike other mechanisms, I imagine all
users of Kerberos use the login module included in the JRE. And these
modules happen to use different options depending on the vendor. I am not
very familiar with the Hadoop codebase, but it looks like Hadoop contains
code that abstracts out Kerberos options so that it works with any JRE.
This KIP does not preclude better handling for Kerberos in future.

For other mechanisms like PLAIN, we want the login module to be pluggable.
And that means the options need to be extensible. Here JAAS config enables
a format that is consistent with the jaas config file, but without the
current limitations.


On Mon, Nov 28, 2016 at 1:00 PM, Ismael Juma  wrote:

> I'm very late to this, but better late than never, I guess. I am +1 on this
> because it improves on the status quo, satisfies a real need and is simple
> to implement.
>
> Having said that, I'd also like to state that it's a bit of a shame that we
> are doubling down on the JAAS config format. It is a peculiar format and in
> the Kerberos case (one of the common usages), it requires users to provide
> different configs depending on the Java implementation being used. It would
> be nice if we looked into abstracting some of this to make users' lives
> easier. Looking at the Hadoop codebase, it looks like they try to do that
> although I don't know how well it worked out in practice.
>
> Ismael
>
> On Tue, Nov 1, 2016 at 1:42 PM, Rajini Sivaram <
> rajinisiva...@googlemail.com
> > wrote:
>
> > KIP-85 vote has passed with 4 binding (Harsha, Gwen, Jason, Jun) and 4
> > non-binding (Mickael, Jim, Edo, me) votes.
> >
> > Thank you all for your votes and comments. I will update the KIP page and
> > rebase the PR.
> >
> > Many thanks,
> >
> > Rajini
> >
> >
> >
> > On Mon, Oct 31, 2016 at 11:29 AM, Edoardo Comar 
> wrote:
> >
> > > +1 great KIP
> > > --
> > > Edoardo Comar
> > > IBM MessageHub
> > > eco...@uk.ibm.com
> > > IBM UK Ltd, Hursley Park, SO21 2JN
> > >
> > > IBM United Kingdom Limited Registered in England and Wales with number
> > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> > PO6
> > > 3AU
> > >
> > >
> > >
> > > From:   Rajini Sivaram 
> > > To: dev@kafka.apache.org
> > > Date:   26/10/2016 16:27
> > > Subject:[VOTE] KIP-85: Dynamic JAAS configuration for Kafka
> > > clients
> > >
> > >
> > >
> > > I would like to initiate the voting process for KIP-85: Dynamic JAAS
> > > configuration for Kafka Clients:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 85%3A+Dynamic+JAAS+
> > > configuration+for+Kafka+clients
> > >
> > >
> > > This KIP enables Java clients to connect to Kafka using SASL without a
> > > physical jaas.conf file. This will also be useful to configure multiple
> > > KafkaClient login contexts when multiple users are supported within a
> > JVM.
> > >
> > > Thank you...
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > >
> > > Unless stated otherwise above:
> > > IBM United Kingdom Limited - Registered in England and Wales with
> number
> > > 741598.
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> > 3AU
> > >
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
> >
>



-- 
Regards,

Rajini


How to commit bug fix to kafka mesos framework

2016-11-28 Thread Yu Wei
Hi Guys,

I found a bug in kafka mesos framework related with support for kafka 0.9.x and 
later.

Now I have a solution to fix the problem.


So how could I commit the code to the project? Is there any process about it?

And who could help me?


Thanks,

Jared, (??)
Software developer
Interested in open source software, cloud computing, big data, Linux


Re: [VOTE] KIP-84: Support SASL SCRAM mechanisms

2016-11-28 Thread Ismael Juma
Hi Rajini,

Thanks for the KIP. I am in favour of introducing SCRAM as an additional
SASL mechanism. A few comments:

1. Magnus raised the point that cyrus-sasl currently only
implements SCRAM-SHA-1, so having a larger number of variants will involve
more work for non-Java clients. Do we really need all of SCRAM-SHA-224,
SCRAM-SHA-256, SCRAM-SHA-384 and SCRAM-SHA-512?

2. Like Radai mentioned, it seems a bit weird to use single letter keys.
The space savings don't seem particularly useful given the size of the
values. You mentioned that it would not be useful for humans to interpret
them, but we do expose them via the configs tool. Since we have to add more
keys, I am also not convinced about the benefit of reusing the same letters
as for SCRAM messages.

3. Have we given any thought on whether the znodes storing the
authentication information should have additional access restrictions (even
though they are hashed and salted)? Is the assumption that people would
enable ZK authentication?

Ismael

On Tue, Nov 15, 2016 at 7:25 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Radai,
>
> I don't have a strong objection to using a more verbose format. But the
> reasons for choosing the cryptic s=,t=,... format:
>
>1. Unlike other properties like quotas stored in Zookeeper which need to
>be human readable in order to query the values, these values only need
> to
>be parsed by code. The base64-encoded array of bytes don't really mean
>anything to a human. You would only ever want to check if the user has
>credentials for a mechanism, you can't really tell what the credentials
> are
>from the value stored in ZK.
>2. Single letter keys save space. Agree, it is not much, but since a
>more verbose format doesn't add much value, it feels like wasted space
> in
>ZK to store long key names for each property for each user for each
>mechanism.
>3. SCRAM authentication messages defined in RFC 5802
> use comma-separated key=value
>pairs with single letter keys. s= and i= appear
> exactly
>like that in SCRAM messages. Server key and stored key are not
> exchanged,
>so I chose two unused letters. The same parser used for SCRAM messages
> is
>used to parse this persisted value as well since the format is the same.
>
>
> On Tue, Nov 15, 2016 at 5:02 PM, radai  wrote:
>
> > small nitpick - given that s,t,k and i are used as part of a rather large
> > CSV format, what is the gain in having them be single letter aliases?
> > in other words - why not salt=... , serverKey=... , storedKey=... ,
> > iterations=... ?
> >
> > On Tue, Nov 15, 2016 at 7:26 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> > > +1
> > >
> > > On Tue, Nov 15, 2016 at 10:57 AM, Rajini Sivaram
> > >  wrote:
> > > > Jun,
> > > >
> > > > Thank you, I have made the updates to the KIP.
> > > >
> > > > On Tue, Nov 15, 2016 at 12:34 AM, Jun Rao  wrote:
> > > >
> > > >> Hi, Rajini,
> > > >>
> > > >> Thanks for the proposal. +1. A few minor comments.
> > > >>
> > > >> 30. Could you add that the broker config sasl.enabled.mechanisms can
> > now
> > > >> take more values?
> > > >>
> > > >> 31. Could you document the meaning of s,t,k,i used in
> > > /config/users/alice
> > > >> in ZK?
> > > >>
> > > >> 32. In the rejected section, could you document why we decided not
> to
> > > bump
> > > >> up the version of SaslHandshakeRequest?
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Mon, Nov 14, 2016 at 5:57 AM, Rajini Sivaram <
> > > >> rajinisiva...@googlemail.com> wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > I would like to initiate the voting process for *KIP-84: Support
> > > >> SASL/SCRAM
> > > >> > mechanisms*:
> > > >> >
> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> > 84%3A+Support+SASL+SCRAM+mechanisms
> > > >> >
> > > >> > This KIP adds support for four SCRAM mechanisms (SHA-224, SHA-256,
> > > >> SHA-384
> > > >> > and SHA-512) for SASL authentication, giving more choice for users
> > to
> > > >> > configure security. When delegation token support is added to
> Kafka,
> > > >> SCRAM
> > > >> > will also support secure authentication using delegation tokens.
> > > >> >
> > > >> > Thank you...
> > > >> >
> > > >> > Regards,
> > > >> >
> > > >> > Rajini
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > >
> > > > Rajini
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>


Re: [VOTE] KIP-85: Dynamic JAAS configuration for Kafka clients

2016-11-28 Thread Ismael Juma
I'm very late to this, but better late than never, I guess. I am +1 on this
because it improves on the status quo, satisfies a real need and is simple
to implement.

Having said that, I'd also like to state that it's a bit of a shame that we
are doubling down on the JAAS config format. It is a peculiar format and in
the Kerberos case (one of the common usages), it requires users to provide
different configs depending on the Java implementation being used. It would
be nice if we looked into abstracting some of this to make users' lives
easier. Looking at the Hadoop codebase, it looks like they try to do that
although I don't know how well it worked out in practice.

Ismael

On Tue, Nov 1, 2016 at 1:42 PM, Rajini Sivaram  wrote:

> KIP-85 vote has passed with 4 binding (Harsha, Gwen, Jason, Jun) and 4
> non-binding (Mickael, Jim, Edo, me) votes.
>
> Thank you all for your votes and comments. I will update the KIP page and
> rebase the PR.
>
> Many thanks,
>
> Rajini
>
>
>
> On Mon, Oct 31, 2016 at 11:29 AM, Edoardo Comar  wrote:
>
> > +1 great KIP
> > --
> > Edoardo Comar
> > IBM MessageHub
> > eco...@uk.ibm.com
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> > IBM United Kingdom Limited Registered in England and Wales with number
> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> > 3AU
> >
> >
> >
> > From:   Rajini Sivaram 
> > To: dev@kafka.apache.org
> > Date:   26/10/2016 16:27
> > Subject:[VOTE] KIP-85: Dynamic JAAS configuration for Kafka
> > clients
> >
> >
> >
> > I would like to initiate the voting process for KIP-85: Dynamic JAAS
> > configuration for Kafka Clients:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 85%3A+Dynamic+JAAS+
> > configuration+for+Kafka+clients
> >
> >
> > This KIP enables Java clients to connect to Kafka using SASL without a
> > physical jaas.conf file. This will also be useful to configure multiple
> > KafkaClient login contexts when multiple users are supported within a
> JVM.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
> >
> >
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598.
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> >
>
>
>
> --
> Regards,
>
> Rajini
>


Re: any plans to switch to java 8?

2016-11-28 Thread Ismael Juma
I think there are 3 main points that can be taken from that discussion with
regards to the timing:

1. We should do the switch no earlier than Kafka's next major version bump
(i.e. 0.11.0.0 at this point)
2. Some would prefer to support two Java versions, so we'd have to wait
until Kafka's next major version bump _after_ Java 9 is released. Java 9 is
currently scheduled to be released in July 2017. I like the guideline of
supporting two Java versions at a time, but multiple delays to Java 8 and 9
combined with huge improvements in Java 8 could provide the basis for an
exception.
3. Some would prefer the clients jar to support Java 7 for longer as there
are cases where it is hard to upgrade all clients to use Java 8 (maybe they
run in an older App Server that only supports Java 7, for example).

It seems like 1 is a hard requirement while 2 and 3 are less so. Given
that, I was planning to restart the conversation when we have a plan to
bump Kafka's major version (a message format change would quality
typically).

Ismael

On Thu, Nov 10, 2016 at 7:03 PM, Joel Koshy  wrote:

> http://markmail.org/message/gnrn5ccql7a2pmc5
> We can bump that up to revisit the discussion. That thread didn't have any
> closure, but has a lot of background information.
>
> On Thu, Nov 10, 2016 at 10:37 AM, Sean McCauliff  >
> wrote:
>
> > Wait for JDK 9 which is supposed to be 4-5 months from now?
> >
> > Sean
> >
> > On Thu, Nov 10, 2016 at 10:23 AM, radai 
> > wrote:
> > > with java 7 being EOL'ed for more than a year and a half now (apr 2015,
> > see
> > > http://www.oracle.com/technetwork/java/eol-135779.html) i was
> wondering
> > if
> > > there's an official plan/timetable for transitioning the kafka codebase
> > > over to java 8?
> >
>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-28 Thread Ismael Juma
Thanks for the KIP, +1 from me.

I have a few comments with regards to the method names chosen, but since
none of the classes in question are public API, I'll comment directly in
the PR.

Ismael

On Mon, Nov 7, 2016 at 9:08 PM, radai  wrote:

> Hi,
>
> I would like to initiate a vote on KIP-72:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
>
> The kip allows specifying a limit on the amount of memory allocated for
> reading incoming requests into. This is useful for "sizing" a broker and
> avoiding OOMEs under heavy load (as actually happens occasionally at
> linkedin).
>
> I believe I've addressed most (all?) concerns brought up during the
> discussion.
>
> To the best of my understanding this vote is about the goal and
> public-facing changes related to the new proposed behavior, but as for
> implementation, i have the code up here:
>
> https://github.com/radai-rosenblatt/kafka/tree/broker-
> memory-pool-with-muting
>
> and I've stress-tested it to work properly (meaning it chugs along and
> throttles under loads that would DOS 10.0.1.0 code).
>
> I also believe that the primitives and "pattern"s introduced in this KIP
> (namely the notion of a buffer pool and retrieving from / releasing to said
> pool instead of allocating memory) are generally useful beyond the scope of
> this KIP for both performance issues (allocating lots of short-lived large
> buffers is a performance bottleneck) and other areas where memory limits
> are a problem (KIP-81)
>
> Thank you,
>
> Radai.
>


[jira] [Assigned] (KAFKA-3537) Provide access to low-level Metrics in ProcessorContext

2016-11-28 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3537:
---

Assignee: Eno Thereska  (was: Guozhang Wang)

> Provide access to low-level Metrics in ProcessorContext
> ---
>
> Key: KAFKA-3537
> URL: https://issues.apache.org/jira/browse/KAFKA-3537
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.9.0.1
>Reporter: Michael Coon
>Assignee: Eno Thereska
>Priority: Minor
>  Labels: semantics
> Fix For: 0.10.2.0
>
>
> It would be good to have access to the underlying Metrics component in 
> StreamMetrics. StreamMetrics forces a naming convention for metrics that does 
> not fit our use case for reporting. We need to be able to convert the stream 
> metrics to our own metrics formatting and it's cumbersome to extract group/op 
> names from pre-formatted strings the way they are setup in StreamMetricsImpl. 
> If there were a "metrics()" method of StreamMetrics to give me the underlying 
> Metrics object, I could register my own sensors/metrics as needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-11-28 Thread Abhi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15701526#comment-15701526
 ] 

Abhi commented on KAFKA-1194:
-

[~haraldk] Do you have the build for the same issue. I tried building kafka in 
past, but it gave me unscrupulous build errors and I gave it up at that time to 
concentrate on my business logic... 

Abhi

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Assignee: Jay Kreps
>  Labels: features, patch
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-84: Support SASL SCRAM mechanisms

2016-11-28 Thread Rajini Sivaram
Another committer vote is needed for this to go through. Anyone have time
to review?

Thank you!

On Tue, Nov 15, 2016 at 6:26 PM, Gwen Shapira  wrote:

> +1
>
> On Mon, Nov 14, 2016 at 5:57 AM, Rajini Sivaram
>  wrote:
> > Hi all,
> >
> > I would like to initiate the voting process for *KIP-84: Support
> SASL/SCRAM
> > mechanisms*:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 84%3A+Support+SASL+SCRAM+mechanisms
> >
> > This KIP adds support for four SCRAM mechanisms (SHA-224, SHA-256,
> SHA-384
> > and SHA-512) for SASL authentication, giving more choice for users to
> > configure security. When delegation token support is added to Kafka,
> SCRAM
> > will also support secure authentication using delegation tokens.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>



-- 
Regards,

Rajini


[jira] [Updated] (KAFKA-3715) Higher granularity streams metrics

2016-11-28 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3715:

Labels: api  (was: api newbie)

> Higher granularity streams metrics 
> ---
>
> Key: KAFKA-3715
> URL: https://issues.apache.org/jira/browse/KAFKA-3715
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: aarti gupta
>Priority: Minor
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Originally proposed by [~guozhang] in 
> https://github.com/apache/kafka/pull/1362#issuecomment-218326690
> We can consider adding metrics for process / punctuate / commit rate at the 
> granularity of each processor node in addition to the global rate mentioned 
> above. This is very helpful in debugging.
> We can consider adding rate / total cumulated metrics for context.forward 
> indicating how many records were forwarded downstream from this processor 
> node as well. This is helpful in debugging.
> We can consider adding metrics for each stream partition's timestamp. This is 
> helpful in debugging.
> Besides the latency metrics, we can also add throughput latency in terms of 
> source records consumed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >