[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Tretyakov updated KAFKA-1481: -- Attachment: KAFKA-1481_2014-10-15_10-23-35.patch KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172118#comment-14172118 ] Vladimir Tretyakov commented on KAFKA-1481: --- Added 2 patches (created with git diff and from IDEA IDE) with a small refactoring in KafkaMetricsGroup. Jun can you please double check and change part related to metrics removing (KAFKA-1567) I can't easily test this code locally, thx a lot. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172118#comment-14172118 ] Vladimir Tretyakov edited comment on KAFKA-1481 at 10/15/14 8:01 AM: - Added 2 patches (created with git diff and from IDEA IDE) with a small refactoring in KafkaMetricsGroup. Jun can you please double check and change part related to metrics removing (KAFKA-1567) I can't easily test this code locally, thx a lot. PS: all tests passed after this patch for me locally. was (Author: vladimir.tretyakov): Added 2 patches (created with git diff and from IDEA IDE) with a small refactoring in KafkaMetricsGroup. Jun can you please double check and change part related to metrics removing (KAFKA-1567) I can't easily test this code locally, thx a lot. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26710: Patch for KAFKA-1637
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/#review56699 --- core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/26710/#comment97084 I think it's better to use the metadata cache here. - Neha Narkhede On Oct. 14, 2014, 10:04 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- (Updated Oct. 14, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description --- KAFKA-1637 Return correct error code and offsets for OffsetFetchRequest for unknown topics/partitions vs no associated consumer. Diffs - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26710: Patch for KAFKA-1637
On Oct. 15, 2014, 5:22 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 510 https://reviews.apache.org/r/26710/diff/1/?file=721124#file721124line510 There is an issue here. The replica manager only contains information about partitions that are assigned to this broker. However, some consumer group's offset manager may also be on this broker and that group may consume various partitions that are not assigned to this broker. The offset manager though will still contain offsets for those partitions. Wups. My bad. Didn't want to push this patch, but the other one. Also have a suggestion on the same code you commented on. - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/#review56662 --- On Oct. 14, 2014, 10:04 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- (Updated Oct. 14, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description --- KAFKA-1637 Return correct error code and offsets for OffsetFetchRequest for unknown topics/partitions vs no associated consumer. Diffs - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172372#comment-14172372 ] Neha Narkhede commented on KAFKA-1637: -- Wups. Didn't mean to push the patch for this JIRA. Sorry! Had a suggestion myself for not using the replica manager which didn't get published by reviewboard. SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review request - Producer managed offsets
We have a requirement where in only one producer should be writing to a partition at once.The producer will be managing it's own offsets, what it does is at the time of startup gets latest offsets from Kafka and while producing the messages it assigns this offsets to the message.For this we had to write our own producer (which will be open sourced soon).And also create this patch to validate the offsets which are sent by the producer.The patch will just check for this produce request if it's coming from our own producer then it'll validate the offset and returns error which the producer will recover from. If possible can any kafka developer do a quick review of it please? The only concern here is can it be possible that there'll by any gap in offsets assigned to messages by Kafka. We will never be enabling compression on our topics. https://github.com/makfair/kafka Diff here:https://github.com/makfair/kafka/commit/241f5a2175b1dcaca00e663e896b5c9a52a75212 Thanks,Ahmed
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172513#comment-14172513 ] Ewen Cheslack-Postava commented on KAFKA-1637: -- Updated reviewboard https://reviews.apache.org/r/26710/diff/ against branch origin/trunk SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26710: Patch for KAFKA-1637
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- (Updated Oct. 15, 2014, 4:08 p.m.) Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description (updated) --- Use MetadataCache instead of ReplicaManager to check for non-existent topics and partitions. Diffs (updated) - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1637: - Attachment: KAFKA-1637_2014-10-15_09:08:12.patch SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1706) Adding a byte bounded blocking queue to util.
Jiangjie Qin created KAFKA-1706: --- Summary: Adding a byte bounded blocking queue to util. Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description --- Adding ByteBoundedBlockingQueue to utils. Diffs - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172533#comment-14172533 ] Jiangjie Qin commented on KAFKA-1706: - Created reviewboard https://reviews.apache.org/r/26755/diff/ against branch origin/trunk Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1706: Attachment: KAFKA-1706.patch Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1706: Status: Patch Available (was: Open) Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 15, 2014, 4:26 p.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description (updated) --- changed arguments name Diffs (updated) - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1706: Attachment: KAFKA-1706_2014-10-15_09:26:26.patch Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172540#comment-14172540 ] Jiangjie Qin commented on KAFKA-1706: - Updated reviewboard https://reviews.apache.org/r/26755/diff/ against branch origin/trunk Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 15, 2014, 4:28 p.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description (updated) --- changed arguments name correct typo. Diffs (updated) - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1706: Attachment: KAFKA-1706_2014-10-15_09:28:01.patch Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26663: Patch for KAFKA-979
On Oct. 15, 2014, 4:19 a.m., Joel Koshy wrote: core/src/main/scala/kafka/log/Log.scala, line 515 https://reviews.apache.org/r/26663/diff/2/?file=721126#file721126line515 Thinking about this a bit more: do you think it would be safer to interpret jitter as an additive value to segmentMs? i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; (and limit segment.rollJitterMs to an interval of [0, config.segmentMs] which you are already doing.) Otherwise if a user happens to set a high jitter time then nearly empty segments roll often (with high probability). Another way to interpret it is as a jitter window. i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; and limit segment.rollJitterMs to an interval of [-config.segmentMs / 2, config.segmentMs / 2] Thoughts? I considered all of these options. The reason I went with the approach in the patch is that it preserves the meaning of segmentJitterMs which says it is a maximum, albeit soft, time. That said, since this needs to be explicitly enabled by setting the new parameter to be non-zero, I don't think it would be unreasonable to expect someone enabling it to understand the implications. I personally think the final option that causes the average time to be config.segmentMs is most intuitive, but as long as the effect is clearly documented they are all effectively equivalent assuming uniform sampling. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/#review56652 --- On Oct. 14, 2014, 10:33 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/ --- (Updated Oct. 14, 2014, 10:33 p.m.) Review request for kafka. Bugs: KAFKA-979 https://issues.apache.org/jira/browse/KAFKA-979 Repository: kafka Description --- Add a new options log.roll.jitter.ms and log.roll.jitter.hours to add random jitter to time-based log rolling so logs aren't likely to roll at exactly the same time. Jitter always reduces the timeout so log.roll.ms still provides a soft maximum. Defaults to 0 so no jitter is added by default. Addressing warning and Util.abs comments. Diffs - core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff core/src/main/scala/kafka/log/LogConfig.scala d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 core/src/main/scala/kafka/log/LogSegment.scala 7597d309f37a0b3756381f9500100ef763d466ba core/src/main/scala/kafka/server/KafkaConfig.scala 7fcbc16da898623b03659c803e2a20c7d1bd1011 core/src/main/scala/kafka/server/KafkaServer.scala 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 7b97e6a80753a770ac094e101c653193dec67e68 core/src/test/scala/unit/kafka/log/LogTest.scala a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b Diff: https://reviews.apache.org/r/26663/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26663: Patch for KAFKA-979
On Oct. 15, 2014, 4:19 a.m., Joel Koshy wrote: core/src/main/scala/kafka/log/Log.scala, line 515 https://reviews.apache.org/r/26663/diff/2/?file=721126#file721126line515 Thinking about this a bit more: do you think it would be safer to interpret jitter as an additive value to segmentMs? i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; (and limit segment.rollJitterMs to an interval of [0, config.segmentMs] which you are already doing.) Otherwise if a user happens to set a high jitter time then nearly empty segments roll often (with high probability). Another way to interpret it is as a jitter window. i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; and limit segment.rollJitterMs to an interval of [-config.segmentMs / 2, config.segmentMs / 2] Thoughts? Ewen Cheslack-Postava wrote: I considered all of these options. The reason I went with the approach in the patch is that it preserves the meaning of segmentJitterMs which says it is a maximum, albeit soft, time. That said, since this needs to be explicitly enabled by setting the new parameter to be non-zero, I don't think it would be unreasonable to expect someone enabling it to understand the implications. I personally think the final option that causes the average time to be config.segmentMs is most intuitive, but as long as the effect is clearly documented they are all effectively equivalent assuming uniform sampling. Neha Narkhede wrote: Thinking about this more in the context of how we initially found this problem. Users want to use time based rolling along side time based retention since it makes it easier to reason about the time window of data in a segment. This is mainly useful when resetting offset based on time since offsets are returned only on segment boundaries. From a user perspective, time based rolling is just supposed to work out of the box and not have performance implications in large clusters, which in fact, today it does. This is also very nuanced for most users to understand and work around and almost everyone would just expect Kafka to do the right thing. Essentially, I'm arguing this not be a configurable constant value but a value derived from performance tests done by us. Even if it has to be exposed through a config, it seems better for it to be a function of the segment roll time instead of a constant value. This way, people don't have to worry about it except in rare cases where it nee ds to be tuned and even then is difficult to screw up. It might make sense to run a stress test (possibly using some tweaked version of StressTestLog.scala). I think Ewen's evaluation criteria is a useful one. i.e., what is the average age going to be. In the current patch, the age ranges from [segment.ms - randomJitter, segment.ms] where randomJitter ranges from [0, min(jitterMs, segment.ms)]. If jitterMs == segment.ms the average age will be segment.ms / 2. If the age ranges from [segment.ms, segment.ms + randomJitter] the average age will be segment.ms + segment.ms / 2. If the age ranges from [segment.ms - randomJitter / 2, segment.ms + randomJitter / 2) the average age will be segment.ms - which is the most intuitive. @Neha I actually think all this will be an interim solution until we get timestamps into the message metadata and log index. There is a thread discussing that. When that is done I think we can do with only size-based rolling and time-based retention can be done by using message header metadata. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/#review56652 --- On Oct. 14, 2014, 10:33 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/ --- (Updated Oct. 14, 2014, 10:33 p.m.) Review request for kafka. Bugs: KAFKA-979 https://issues.apache.org/jira/browse/KAFKA-979 Repository: kafka Description --- Add a new options log.roll.jitter.ms and log.roll.jitter.hours to add random jitter to time-based log rolling so logs aren't likely to roll at exactly the same time. Jitter always reduces the timeout so log.roll.ms still provides a soft maximum. Defaults to 0 so no jitter is added by default. Addressing warning and Util.abs comments. Diffs - core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff core/src/main/scala/kafka/log/LogConfig.scala
[jira] [Created] (KAFKA-1707) ConsumerOffsetChecker shows none partitions assigned
Hari created KAFKA-1707: --- Summary: ConsumerOffsetChecker shows none partitions assigned Key: KAFKA-1707 URL: https://issues.apache.org/jira/browse/KAFKA-1707 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 2.40GHz/1.2e+02GB Reporter: Hari Assignee: Neha Narkhede bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some partitions having none consumers after re-balance triggered due to new consumer joined/disconnected to the group. The lag gets piling up till the partitions are assigned to it after re-balance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review56736 --- Ship it! Ship It! - Guozhang Wang On Oct. 13, 2014, 11:38 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 13, 2014, 11:38 p.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Created] (KAFKA-1708) Consumers intermittently stop consuming till restart
Hari created KAFKA-1708: --- Summary: Consumers intermittently stop consuming till restart Key: KAFKA-1708 URL: https://issues.apache.org/jira/browse/KAFKA-1708 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 2.40GHz/1.2e+02GB Reporter: Hari Assignee: Neha Narkhede Using a simple consumer, and reading messages using StreamIterator noticed that the consumptions suddenly stops and the lag starts building up till the consumer is restarted. Below is the code snippet final MapString, ListKafkaStreambyte[], byte[] streamsByName = consumerConnector.createMessageStreams(topicCountMap); ConsumerIteratorbyte[], byte[] streamIterator = streamsByName.get(topicName).get(IDX_FIRST_ITEM).iterator(); if (streamIterator.hasNext()) { final MessageAndMetadatabyte[], byte[] item = streamIterator.next(); ... } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1707) ConsumerOffsetChecker shows none partitions assigned
[ https://issues.apache.org/jira/browse/KAFKA-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hari updated KAFKA-1707: Description: bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some partitions having none consumers after re-balance triggered due to new consumer joined/disconnected to the group. The lag gets piling up till the partitions are assigned to it usually after another re-balance trigger. (was: bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some partitions having none consumers after re-balance triggered due to new consumer joined/disconnected to the group. The lag gets piling up till the partitions are assigned to it after re-balance.) ConsumerOffsetChecker shows none partitions assigned Key: KAFKA-1707 URL: https://issues.apache.org/jira/browse/KAFKA-1707 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 2.40GHz/1.2e+02GB Reporter: Hari Assignee: Neha Narkhede Labels: patch bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some partitions having none consumers after re-balance triggered due to new consumer joined/disconnected to the group. The lag gets piling up till the partitions are assigned to it usually after another re-balance trigger. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Message Metadata
Thanks Joe, I think we now have a few open questions to discuss around this topic: 1. Shall we make core Kafka properties as first class fields in message header or put them as tags? The pros of the first approach is more compacted format and hence less message header overhead; the cons are that any future message header change needs protocol bump and possible multi-versioned handling on the server side. Vice versa for the second approach. 2. Shall we leave app properties still in message content and enforce schema based topics or make them as extensible tags? The pros of the first approach is again saving message header overhead for apps properties; and the cons are that it enforce schema usage for message content to be partially de-serialized only for app header. At LinkedIn we enforce Avro schemas for auditing purposes, and as a result the Kafka team has to manage the schema registration process / schema repository as well. 3. Which properties should be core KAFKA and which should be app properties? For example, shall we make properties that only MM cares about as app properties or Kafka properties? Guozhang On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein joe.st...@stealth.ly wrote: I think we could add schemaId(binary) to the MessageAndMetaData With the schemaId you can implement different downstream software pattern on the messages reliably. I wrote up more thoughts on this use https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it should strive to encompass all implementation needs for producer, broker, consumer hooks. So if the application and tagged fields are important you can package that into a specific Kafka topic plug-in and assign it to topic(s). Kafka server should be able to validate your expected formats (like encoders/decoders but in broker by topic regardless of producer) to the topics that have it enabled. We should have these maintained in the project under contrib. =- Joestein On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Jay, Thanks for the comments. Replied inline. Guozhang On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps jay.kr...@gmail.com wrote: I need to take more time to think about this. Here are a few off-the-cuff remarks: - To date we have tried really, really hard to keep the data model for message simple since after all you can always add whatever you like inside the message body. - For system tags, why not just make these fields first class fields in message? The purpose of a system tag is presumably that Why have a bunch of key-value pairs versus first-class fields? Yes, we can alternatively make system tags as first class fields in the message header to make the format / processing logic simpler. The main reasons I put them as systems tags are 1) when I think about these possible system tags, some of them are for all types of messages (e.g. timestamps), but some of them may be for a specific type of message (compressed, control message) and for those not all of them are necessarily required all the time, hence making them as compact tags may save us some space when not all of them are available; 2) with tags we do not need to bump up the protocol version every time we make a change to it, which includes keeping the logic to handle all versions on the broker until the old ones are officially discarded; instead, the broker can just ignore a tag if its id is not recognizable since the client is on a newer version, or use some default value / throw exception if a required tag is missing since the client is on an older version. - You don't necessarily need application-level tags explicitly represented in the message format for efficiency. The application can define their own header (e.g. their message could be a size delimited header followed by a size delimited body). But actually if you use Avro you don't even need this I don't think. Avro has the ability to just deserialize the header fields in your message. Avro has a notion of reader and writer schemas. The writer schema is whatever the message was written with. If the reader schema is just the header, avro will skip any fields it doesn't need and just deserialize the fields it does need. This is actually a much more usable and flexible way to define a header since you get all the types avro allows instead of just bytes. I agree that we can use a reader schema to just read out the header without de-serializing the full message, and probably for compressed message we can add an Avro / etc header for the compressed wrapper message also, but that would enforce these applications (MM, auditor, clients) to be schema-aware, which would usually require the people who manage this data pipeline also manage the schemas, whereas ideally Kafka itself should just consider bytes-in and bytes-out (and
Re: Review Request 26663: Patch for KAFKA-979
On Oct. 15, 2014, 4:19 a.m., Joel Koshy wrote: core/src/main/scala/kafka/log/Log.scala, line 515 https://reviews.apache.org/r/26663/diff/2/?file=721126#file721126line515 Thinking about this a bit more: do you think it would be safer to interpret jitter as an additive value to segmentMs? i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; (and limit segment.rollJitterMs to an interval of [0, config.segmentMs] which you are already doing.) Otherwise if a user happens to set a high jitter time then nearly empty segments roll often (with high probability). Another way to interpret it is as a jitter window. i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; and limit segment.rollJitterMs to an interval of [-config.segmentMs / 2, config.segmentMs / 2] Thoughts? Ewen Cheslack-Postava wrote: I considered all of these options. The reason I went with the approach in the patch is that it preserves the meaning of segmentJitterMs which says it is a maximum, albeit soft, time. That said, since this needs to be explicitly enabled by setting the new parameter to be non-zero, I don't think it would be unreasonable to expect someone enabling it to understand the implications. I personally think the final option that causes the average time to be config.segmentMs is most intuitive, but as long as the effect is clearly documented they are all effectively equivalent assuming uniform sampling. Neha Narkhede wrote: Thinking about this more in the context of how we initially found this problem. Users want to use time based rolling along side time based retention since it makes it easier to reason about the time window of data in a segment. This is mainly useful when resetting offset based on time since offsets are returned only on segment boundaries. From a user perspective, time based rolling is just supposed to work out of the box and not have performance implications in large clusters, which in fact, today it does. This is also very nuanced for most users to understand and work around and almost everyone would just expect Kafka to do the right thing. Essentially, I'm arguing this not be a configurable constant value but a value derived from performance tests done by us. Even if it has to be exposed through a config, it seems better for it to be a function of the segment roll time instead of a constant value. This way, people don't have to worry about it except in rare cases where it nee ds to be tuned and even then is difficult to screw up. It might make sense to run a stress test (possibly using some tweaked version of StressTestLog.scala). Joel Koshy wrote: I think Ewen's evaluation criteria is a useful one. i.e., what is the average age going to be. In the current patch, the age ranges from [segment.ms - randomJitter, segment.ms] where randomJitter ranges from [0, min(jitterMs, segment.ms)]. If jitterMs == segment.ms the average age will be segment.ms / 2. If the age ranges from [segment.ms, segment.ms + randomJitter] the average age will be segment.ms + segment.ms / 2. If the age ranges from [segment.ms - randomJitter / 2, segment.ms + randomJitter / 2) the average age will be segment.ms - which is the most intuitive. @Neha I actually think all this will be an interim solution until we get timestamps into the message metadata and log index. There is a thread discussing that. When that is done I think we can do with only size-based rolling and time-based retention can be done by using message header metadata. The concerns here are very much opposed -- Joel seems to be interested in good, intuitive control over the exact results of using jitter but Neha wants things to just work. I assume this issue only comes up when you have a large enough deployment that a lot of logs can roll at once, in which case you're probably tweaking a bunch of settings anyway. I'm also not sure we could come up with one good constant since the problem scales with the # of partitions. I think the best we could do is try to come up with a conservative maximum # of partitions/logs (per disk?) to support well without tweaking, then measure an average fsync time and choose a default based on that. Then again, for the just works case, the default roll time is 1 week, so even a small jitter (e.g. minutes) would have little impact on the timing and be more than enough jitter. I think the most useful input here would be from an ops person who could say what their ideal is (and whether they think a constant value would be able to reasonably solve the problem). - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/#review56652
Re: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
HI Jun, I will file a Jira Bug for this and I will attach YourKit Profile Snapshot and screen short. Do you want me take Thread dump each time second ? Because the threads are blocked on SYNC code block like you mentioned and the YourKit Profile snapshot will contain Thread dump. Thanks, Bhavesh On Tue, Oct 14, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote: Bhavesh, It seems that all those threads are blocked on waiting for the lock on the dq for that partition. There got to be another thread holding the dq lock at that point. Could you create a jira and attach the full thread dump there? Also, could you attach the yourkit result that shows the breakdown of the time? Thanks, Jun On Tue, Oct 14, 2014 at 10:41 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Yes, it is reproducible quite easily. The problem is synchronized in RecordAccumulator. You can easy produce it. I have attached the Java code in my original email. Due to Application threads enqueue message into single partition is causing thrad contention and application thread may be blocked on this for more than a 2 minutes as shown in original email. Let me know if you need more information. Last Commit I tested with: commit 68b9f7716df1d994a9d43bec6bc42c90e66f1e99 Author: Anton Karamanov atara...@gmail.com Date: Tue Oct 7 18:22:31 2014 -0700 kafka-1644; Inherit FetchResponse from RequestOrResponse; patched by Anton Karamanov; reviewed by Jun Rao Thanks, Bhavesh On Tue, Oct 14, 2014 at 10:16 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, This sounds like a problem. Just to confirm this is after the fix for KAFKA-1673? https://issues.apache.org/jira/browse/KAFKA-1673 It sounds like you have a reproducible test case? -Jay On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744
Review Request 26770: Patch for KAFKA-1108
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/ --- Review request for kafka. Bugs: KAFKA-1108 https://issues.apache.org/jira/browse/KAFKA-1108 Repository: kafka Description --- KAFKA-1108 Log IOException messages during controlled shutdown. Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 07c0a078ffa5142441f687da851472da732c3837 Diff: https://reviews.apache.org/r/26770/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1108: - Attachment: KAFKA-1108.patch when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1108: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Ewen Cheslack-Postava Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172775#comment-14172775 ] Ewen Cheslack-Postava commented on KAFKA-1108: -- Created reviewboard https://reviews.apache.org/r/26770/diff/ against branch origin/trunk when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26663: Patch for KAFKA-979
On Oct. 15, 2014, 4:19 a.m., Joel Koshy wrote: core/src/main/scala/kafka/log/Log.scala, line 515 https://reviews.apache.org/r/26663/diff/2/?file=721126#file721126line515 Thinking about this a bit more: do you think it would be safer to interpret jitter as an additive value to segmentMs? i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; (and limit segment.rollJitterMs to an interval of [0, config.segmentMs] which you are already doing.) Otherwise if a user happens to set a high jitter time then nearly empty segments roll often (with high probability). Another way to interpret it is as a jitter window. i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; and limit segment.rollJitterMs to an interval of [-config.segmentMs / 2, config.segmentMs / 2] Thoughts? Ewen Cheslack-Postava wrote: I considered all of these options. The reason I went with the approach in the patch is that it preserves the meaning of segmentJitterMs which says it is a maximum, albeit soft, time. That said, since this needs to be explicitly enabled by setting the new parameter to be non-zero, I don't think it would be unreasonable to expect someone enabling it to understand the implications. I personally think the final option that causes the average time to be config.segmentMs is most intuitive, but as long as the effect is clearly documented they are all effectively equivalent assuming uniform sampling. Neha Narkhede wrote: Thinking about this more in the context of how we initially found this problem. Users want to use time based rolling along side time based retention since it makes it easier to reason about the time window of data in a segment. This is mainly useful when resetting offset based on time since offsets are returned only on segment boundaries. From a user perspective, time based rolling is just supposed to work out of the box and not have performance implications in large clusters, which in fact, today it does. This is also very nuanced for most users to understand and work around and almost everyone would just expect Kafka to do the right thing. Essentially, I'm arguing this not be a configurable constant value but a value derived from performance tests done by us. Even if it has to be exposed through a config, it seems better for it to be a function of the segment roll time instead of a constant value. This way, people don't have to worry about it except in rare cases where it nee ds to be tuned and even then is difficult to screw up. It might make sense to run a stress test (possibly using some tweaked version of StressTestLog.scala). Joel Koshy wrote: I think Ewen's evaluation criteria is a useful one. i.e., what is the average age going to be. In the current patch, the age ranges from [segment.ms - randomJitter, segment.ms] where randomJitter ranges from [0, min(jitterMs, segment.ms)]. If jitterMs == segment.ms the average age will be segment.ms / 2. If the age ranges from [segment.ms, segment.ms + randomJitter] the average age will be segment.ms + segment.ms / 2. If the age ranges from [segment.ms - randomJitter / 2, segment.ms + randomJitter / 2) the average age will be segment.ms - which is the most intuitive. @Neha I actually think all this will be an interim solution until we get timestamps into the message metadata and log index. There is a thread discussing that. When that is done I think we can do with only size-based rolling and time-based retention can be done by using message header metadata. Ewen Cheslack-Postava wrote: The concerns here are very much opposed -- Joel seems to be interested in good, intuitive control over the exact results of using jitter but Neha wants things to just work. I assume this issue only comes up when you have a large enough deployment that a lot of logs can roll at once, in which case you're probably tweaking a bunch of settings anyway. I'm also not sure we could come up with one good constant since the problem scales with the # of partitions. I think the best we could do is try to come up with a conservative maximum # of partitions/logs (per disk?) to support well without tweaking, then measure an average fsync time and choose a default based on that. Then again, for the just works case, the default roll time is 1 week, so even a small jitter (e.g. minutes) would have little impact on the timing and be more than enough jitter. I think the most useful input here would be from an ops person who could say what their ideal is (and whether they think a constant value would be able to reasonably solve the problem). @Joel, I understand that the motivation behind that use case of time based rolling will go away. The point I was making is, from a user's perspective, what makes most
Re: Review Request 26710: Patch for KAFKA-1637
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/#review56802 --- Ship it! Minor naming comment which we can take care of on check-in. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/26710/#comment97223 How about val (unknownTopicPartitions, knownTopicPartitions) core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/26710/#comment97224 Same here. - Joel Koshy On Oct. 15, 2014, 4:08 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- (Updated Oct. 15, 2014, 4:08 p.m.) Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description --- Use MetadataCache instead of ReplicaManager to check for non-existent topics and partitions. Diffs - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: [DISCUSSION] Message Metadata
Let me add my view on #2 in less delicate terms than Guozhang did :) When you¹re trying to run Kafka as a service, having to care about the format of the message sucks. I have plenty of users who are just fine using the Avro standard and play nice. Then I have a bunch of users who don¹t want to use Avro and want to do something else (json, some plain text, whatever). Then I have a bunch of users who use Avro but don¹t properly register their schemas. Then I have a bunch of users who do whatever they want and don¹t tell us. What this means is that I can¹t have standard tooling, like auditing, that works on the entire system. I either have to whitelist or blacklist topics, and then I run into problems when someone adds something new either way. It would be preferable if I could monitor and maintain the health of the system without having to worry about the message format. -Todd On 10/15/14, 10:50 AM, Guozhang Wang wangg...@gmail.com wrote: Thanks Joe, I think we now have a few open questions to discuss around this topic: 1. Shall we make core Kafka properties as first class fields in message header or put them as tags? The pros of the first approach is more compacted format and hence less message header overhead; the cons are that any future message header change needs protocol bump and possible multi-versioned handling on the server side. Vice versa for the second approach. 2. Shall we leave app properties still in message content and enforce schema based topics or make them as extensible tags? The pros of the first approach is again saving message header overhead for apps properties; and the cons are that it enforce schema usage for message content to be partially de-serialized only for app header. At LinkedIn we enforce Avro schemas for auditing purposes, and as a result the Kafka team has to manage the schema registration process / schema repository as well. 3. Which properties should be core KAFKA and which should be app properties? For example, shall we make properties that only MM cares about as app properties or Kafka properties? Guozhang On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein joe.st...@stealth.ly wrote: I think we could add schemaId(binary) to the MessageAndMetaData With the schemaId you can implement different downstream software pattern on the messages reliably. I wrote up more thoughts on this use https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it should strive to encompass all implementation needs for producer, broker, consumer hooks. So if the application and tagged fields are important you can package that into a specific Kafka topic plug-in and assign it to topic(s). Kafka server should be able to validate your expected formats (like encoders/decoders but in broker by topic regardless of producer) to the topics that have it enabled. We should have these maintained in the project under contrib. =- Joestein On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Jay, Thanks for the comments. Replied inline. Guozhang On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps jay.kr...@gmail.com wrote: I need to take more time to think about this. Here are a few off-the-cuff remarks: - To date we have tried really, really hard to keep the data model for message simple since after all you can always add whatever you like inside the message body. - For system tags, why not just make these fields first class fields in message? The purpose of a system tag is presumably that Why have a bunch of key-value pairs versus first-class fields? Yes, we can alternatively make system tags as first class fields in the message header to make the format / processing logic simpler. The main reasons I put them as systems tags are 1) when I think about these possible system tags, some of them are for all types of messages (e.g. timestamps), but some of them may be for a specific type of message (compressed, control message) and for those not all of them are necessarily required all the time, hence making them as compact tags may save us some space when not all of them are available; 2) with tags we do not need to bump up the protocol version every time we make a change to it, which includes keeping the logic to handle all versions on the broker until the old ones are officially discarded; instead, the broker can just ignore a tag if its id is not recognizable since the client is on a newer version, or use some default value / throw exception if a required tag is missing since the client is on an older version. - You don't necessarily need application-level tags explicitly represented in the message format for efficiency. The application can define their own header (e.g. their message could be a size delimited header followed by a size delimited body). But actually if you use Avro you don't even need this I don't think. Avro has the
[jira] [Commented] (KAFKA-1514) Update Kafka trunk version number
[ https://issues.apache.org/jira/browse/KAFKA-1514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172942#comment-14172942 ] Glen Mazza commented on KAFKA-1514: --- Trunk is presently 0.8.3-SNAPSHOT: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob_plain;f=gradle.properties;hb=trunk. Can this item be closed? Update Kafka trunk version number - Key: KAFKA-1514 URL: https://issues.apache.org/jira/browse/KAFKA-1514 Project: Kafka Issue Type: Bug Reporter: Jakob Homan Right now the 8.1.1 branch has 8.1.1 as its version, while the trunk has 8.1. Trunk should be 9.0, generally, or at the very least 8.2. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172968#comment-14172968 ] Neha Narkhede commented on KAFKA-1637: -- [~ewencp] Your latest patch looks good. Once you've addressed [~jjkoshy]'s latest comment, I'll push it to trunk and 0.8.2 SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1506) Cancel kafka-reassign-partitions Job
[ https://issues.apache.org/jira/browse/KAFKA-1506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172977#comment-14172977 ] Glen Mazza commented on KAFKA-1506: --- As Guozhang noted, the taking forever may have been due to exceptions/errors that cancel functionality (which may be very difficult to implement correctly) won't necessarily fix. If reassignment is something that is not normally expected to take an inordinate amount of time to complete, there probably is no need for cancel functionality but just fixing the problems that are causing the long time, i.e., checking the logs for any problems with Kafka or the user's configuration causing the delay. Cancel kafka-reassign-partitions Job -- Key: KAFKA-1506 URL: https://issues.apache.org/jira/browse/KAFKA-1506 Project: Kafka Issue Type: New Feature Components: replication, tools Affects Versions: 0.8.1, 0.8.1.1 Reporter: Paul Lung Assignee: Neha Narkhede I started a reassignment, and for some reason it just takes forever. However, it won¹t let me start another reassignment job while this one is running. So a tool to cancel a reassignment job is needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1492) Getting error when sending producer request at the broker end with a single broker
[ https://issues.apache.org/jira/browse/KAFKA-1492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172987#comment-14172987 ] Glen Mazza commented on KAFKA-1492: --- This seems more appropriate for the Kafka user mailing list or Stack Overflow (apache-kafka tag) rather than JIRA. Getting error when sending producer request at the broker end with a single broker -- Key: KAFKA-1492 URL: https://issues.apache.org/jira/browse/KAFKA-1492 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: sriram Assignee: Jun Rao Tried to run a simple example by sending a message to a single broker . Getting error [2014-06-13 08:35:45,402] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-06-13 08:35:45,440] WARN [KafkaApi-1] Produce request with correlation id 2 from client on partition [samsung,0] failed due to Leader not local for partition [samsung,0] on broker 1 (kafka.server.KafkaApis) [2014-06-13 08:35:45,440] INFO [KafkaApi-1] Send the close connection response due to error handling produce request [clientId = , correlationId = 2, topicAndPartition = [samsung,0]] with Ack=0 (kafka.server.KafkaApis) OS- Windows 7 , JDK 1.7 , Scala 2.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26710: Patch for KAFKA-1637
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- (Updated Oct. 15, 2014, 9:47 p.m.) Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description (updated) --- Use MetadataCache instead of ReplicaManager to check for non-existent topics and partitions. Updating naming Diffs (updated) - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173018#comment-14173018 ] Ewen Cheslack-Postava commented on KAFKA-1637: -- Updated reviewboard https://reviews.apache.org/r/26710/diff/ against branch origin/trunk SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch, KAFKA-1637_2014-10-15_14:47:21.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1637: - Attachment: KAFKA-1637_2014-10-15_14:47:21.patch SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch, KAFKA-1637_2014-10-15_14:47:21.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[DISCUSS] Release 0.8.2-beta before 0.8.2?
Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173157#comment-14173157 ] Jun Rao commented on KAFKA-1481: Thanks for the patch. The IDEA one applies successfully for me. However, it seems to cause compilation failure since in a few cases, two lines are incorrectly merged into a single one. Do you think you can try our patch review tool (https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review)? Some comments. 1. KafkaMetricsGroup: 1.1 We now have 2 new MetricName(kafka.consumer, ConsumerTopicMetrics, MessagesPerSec), and 2 new MetricName(kafka.consumer, ConsumerTopicMetrics, BytesPerSec), 1.2 Could we combine the following two methods def newGauge[T](name: String, mBeanName: String, metric: Gauge[T]) def newGauge[T](name: String, metric: Gauge[T]) : Gauge[T] to def newGauge[T](name: String, metric: Gauge[T], tags: Map[String, String] = Map.empty)? We can then general the metric name based on name and the tags. This will consolidate the metric name generation in a single place. In this patch, we have too many places generating the key/value string for the metric name. 2. AbstractFetcherThread: 2.1 name in this class should be just used for the thread name and it shouldn't be included in the metric name. 2.2. The way that we get the metric name in classes like ClientIdAndBroker, ClientIdBrokerTopicPartition and ClientIdAndTopic is not consistently. Sometimes, we generate the key/value in the toString(). Some other times, we rely on the key/value string to be passed in. It's easier to understand if those classes are constructed by just passing in the clientId string, the broker object, the topic string and the partition number, etc. We can generate the metric name by using what's described in 1.2. 2.3 FetcherLagStats just needs to take clientId, instead of ClientIdAndBroker since the per partition lag metric is unique per client id. 2.4 The metric name for a Broker can just use the brokerId, instead of its host and port. 3. Throttler: Do we need to add mBeanName to the constructor? It seems that nobody is using it at the moment. 4. KafkaMetricsGroup: We need to change the way how removeAllMetricsInList() works. We need to do equal comparison on group, type and name and do regex matching with clientId on MetricName.mBeanName. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: Kafka-trunk #304
See https://builds.apache.org/job/Kafka-trunk/304/changes Changes: [neha.narkhede] KAFKA-1637 SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group; reviewed by Neha Narkhede and Joel Koshy -- [...truncated 423 lines...] kafka.utils.SchedulerTest testMockSchedulerNonPeriodicTask PASSED kafka.utils.SchedulerTest testMockSchedulerPeriodicTask PASSED kafka.utils.SchedulerTest testReentrantTaskInMockScheduler PASSED kafka.utils.SchedulerTest testNonPeriodicTask PASSED kafka.utils.SchedulerTest testPeriodicTask PASSED kafka.utils.JsonTest testJsonEncoding PASSED kafka.utils.ReplicationUtilsTest testUpdateLeaderAndIsr PASSED kafka.utils.UtilsTest testSwallow PASSED kafka.utils.UtilsTest testCircularIterator PASSED kafka.utils.UtilsTest testReadBytes PASSED kafka.utils.UtilsTest testAbs PASSED kafka.utils.UtilsTest testReplaceSuffix PASSED kafka.utils.UtilsTest testReadInt PASSED kafka.utils.UtilsTest testCsvList PASSED kafka.utils.UtilsTest testInLock PASSED kafka.utils.IteratorTemplateTest testIterator PASSED kafka.zk.ZKEphemeralTest testEphemeralNodeCleanup PASSED kafka.metrics.KafkaTimerTest testKafkaTimer PASSED kafka.log.LogTest testTimeBasedLogRoll PASSED kafka.log.LogTest testTimeBasedLogRollJitter PASSED kafka.log.LogTest testSizeBasedLogRoll PASSED kafka.log.LogTest testLoadEmptyLog PASSED kafka.log.LogTest testAppendAndReadWithSequentialOffsets PASSED kafka.log.LogTest testAppendAndReadWithNonSequentialOffsets PASSED kafka.log.LogTest testReadAtLogGap PASSED kafka.log.LogTest testReadOutOfRange PASSED kafka.log.LogTest testLogRolls PASSED kafka.log.LogTest testCompressedMessages PASSED kafka.log.LogTest testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED kafka.log.LogTest testMessageSetSizeCheck PASSED kafka.log.LogTest testMessageSizeCheck PASSED kafka.log.LogTest testLogRecoversToCorrectOffset PASSED kafka.log.LogTest testIndexRebuild PASSED kafka.log.LogTest testTruncateTo PASSED kafka.log.LogTest testIndexResizingAtTruncation PASSED kafka.log.LogTest testBogusIndexSegmentsAreRemoved PASSED kafka.log.LogTest testReopenThenTruncate PASSED kafka.log.LogTest testAsyncDelete PASSED kafka.log.LogTest testOpenDeletesObsoleteFiles PASSED kafka.log.LogTest testAppendMessageWithNullPayload PASSED kafka.log.LogTest testCorruptLog PASSED kafka.log.LogTest testCleanShutdownFile PASSED kafka.log.LogSegmentTest testReadOnEmptySegment PASSED kafka.log.LogSegmentTest testReadBeforeFirstOffset PASSED kafka.log.LogSegmentTest testMaxOffset PASSED kafka.log.LogSegmentTest testReadAfterLast PASSED kafka.log.LogSegmentTest testReadFromGap PASSED kafka.log.LogSegmentTest testTruncate PASSED kafka.log.LogSegmentTest testTruncateFull PASSED kafka.log.LogSegmentTest testNextOffsetCalculation PASSED kafka.log.LogSegmentTest testChangeFileSuffixes PASSED kafka.log.LogSegmentTest testRecoveryFixesCorruptIndex PASSED kafka.log.LogSegmentTest testRecoveryWithCorruptMessage PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.OffsetMapTest testBasicValidation PASSED kafka.log.OffsetMapTest testClear PASSED kafka.log.OffsetIndexTest truncate PASSED kafka.log.OffsetIndexTest randomLookupTest PASSED kafka.log.OffsetIndexTest lookupExtremeCases PASSED kafka.log.OffsetIndexTest appendTooMany PASSED kafka.log.OffsetIndexTest appendOutOfOrder PASSED kafka.log.OffsetIndexTest testReopen PASSED kafka.log.LogManagerTest testCreateLog PASSED kafka.log.LogManagerTest testGetNonExistentLog PASSED kafka.log.LogManagerTest testCleanupExpiredSegments PASSED kafka.log.LogManagerTest testCleanupSegmentsToMaintainSize PASSED kafka.log.LogManagerTest testTimeBasedFlush PASSED kafka.log.LogManagerTest testLeastLoadedAssignment PASSED kafka.log.LogManagerTest testTwoLogManagersUsingSameDirFails PASSED kafka.log.LogManagerTest testCheckpointRecoveryPoints PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithTrailingSlash PASSED kafka.log.LogManagerTest
Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review56843 --- Thanks for the patch. Looks good to me. I only have some minor comments below. core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97277 typo: is does not core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97280 Typo: the new the fetch operation core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97278 This seems to be case B. core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97279 This seems to be case A. core/src/main/scala/kafka/server/DelayedProduce.scala https://reviews.apache.org/r/24676/#comment97282 This doesn't match the comment. The error code is returned from checkEnoughReplicasReachOffset, not from writing to local log. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment97283 Should replica manager be offset manager? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment97287 Perhaps add a comment that flag is for testing purpose only. core/src/main/scala/kafka/server/RequestPurgatory.scala https://reviews.apache.org/r/24676/#comment97288 Could we comment on the return value? - Jun Rao On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173210#comment-14173210 ] Jun Rao commented on KAFKA-1583: Just reviewed it. Joel, Do you want to take another look before committing this? Thanks, Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173215#comment-14173215 ] Joel Koshy commented on KAFKA-1583: --- Yes please. I will be able to do this on Friday. Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
Probably 3 thread dumps will be enough. Thanks, Jun On Wed, Oct 15, 2014 at 11:26 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jun, I will file a Jira Bug for this and I will attach YourKit Profile Snapshot and screen short. Do you want me take Thread dump each time second ? Because the threads are blocked on SYNC code block like you mentioned and the YourKit Profile snapshot will contain Thread dump. Thanks, Bhavesh On Tue, Oct 14, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote: Bhavesh, It seems that all those threads are blocked on waiting for the lock on the dq for that partition. There got to be another thread holding the dq lock at that point. Could you create a jira and attach the full thread dump there? Also, could you attach the yourkit result that shows the breakdown of the time? Thanks, Jun On Tue, Oct 14, 2014 at 10:41 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Yes, it is reproducible quite easily. The problem is synchronized in RecordAccumulator. You can easy produce it. I have attached the Java code in my original email. Due to Application threads enqueue message into single partition is causing thrad contention and application thread may be blocked on this for more than a 2 minutes as shown in original email. Let me know if you need more information. Last Commit I tested with: commit 68b9f7716df1d994a9d43bec6bc42c90e66f1e99 Author: Anton Karamanov atara...@gmail.com Date: Tue Oct 7 18:22:31 2014 -0700 kafka-1644; Inherit FetchResponse from RequestOrResponse; patched by Anton Karamanov; reviewed by Jun Rao Thanks, Bhavesh On Tue, Oct 14, 2014 at 10:16 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, This sounds like a problem. Just to confirm this is after the fix for KAFKA-1673? https://issues.apache.org/jira/browse/KAFKA-1673 It sounds like you have a reproducible test case? -Jay On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744
[jira] [Created] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
Bhavesh Mistry created KAFKA-1710: - Summary: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1709) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
Bhavesh Mistry created KAFKA-1709: - Summary: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1709 URL: https://issues.apache.org/jira/browse/KAFKA-1709 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry updated KAFKA-1710: -- Attachment: TestNetworkDownProducer.java Java Test Program to Reproduce this issue. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: TestNetworkDownProducer.java Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry updated KAFKA-1710: -- Attachment: Screen Shot 2014-10-15 at 9.09.06 PM.png Screen Shot 2014-10-13 at 10.19.04 AM.png Your Kit Thread view show thread contentions... [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, TestNetworkDownProducer.java Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173330#comment-14173330 ] Bhavesh Mistry commented on KAFKA-1710: --- Here is out put of Yourkit: {code} Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. kafka-producer-network-thread --- Frozen for at least 14 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.ready(Cluster, long) RecordAccumulator.java:214 org.apache.kafka.clients.producer.internals.Sender.run(long) Sender.java:147 org.apache.kafka.clients.producer.internals.Sender.run() Sender.java:115 java.lang.Thread.run() Thread.java:744 pool-1-thread-106 --- Frozen for at least 20 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-15 --- Frozen for at least 13 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-161 --- Frozen for at least 13 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-165 --- Frozen for at least 17 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-172 --- Frozen for at least 20 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-184 --- Frozen for at least 11 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-26 --- Frozen for at least 11 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run()
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173332#comment-14173332 ] Bhavesh Mistry commented on KAFKA-1710: --- MORE OutPut: {code} Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-108 --- Frozen for at least 12 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-113 --- Frozen for at least 13 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-118 --- Frozen for at least 16 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-138 --- Frozen for at least 12 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-151 --- Frozen for at least 22 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-155 --- Frozen for at least 13 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-160 --- Frozen for at least 13 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:238 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:85 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-163 --- Frozen for at least 12 sec
[jira] [Updated] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry updated KAFKA-1710: -- Attachment: Screen Shot 2014-10-15 at 9.14.15 PM.png Your Kit Monitor Screen shot: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry updated KAFKA-1710: -- Attachment: th15.dump th14.dump th13.dump th12.dump th11.dump th10.dump th9.dump th8.dump th7.dump th6.dump th5.dump th4.dump th3.dump th2.dump th1.dump JStack Thread dumps. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173344#comment-14173344 ] Bhavesh Mistry commented on KAFKA-1710: --- I am not able to attached yourkit profiler snapshot. I get following error: TestNetworkDownProducer-2014-10-15-2.snapshot is too large to attach. Attachment is 28.19 MB but the largest allowed attachment is 10.00 MB. Thanks, Bhavesh [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
Hi Jun, I have entered all details into https://issues.apache.org/jira/browse/KAFKA-1710. Let me know if you need more details. Thanks, Bhavesh On Wed, Oct 15, 2014 at 8:35 PM, Jun Rao jun...@gmail.com wrote: Probably 3 thread dumps will be enough. Thanks, Jun On Wed, Oct 15, 2014 at 11:26 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jun, I will file a Jira Bug for this and I will attach YourKit Profile Snapshot and screen short. Do you want me take Thread dump each time second ? Because the threads are blocked on SYNC code block like you mentioned and the YourKit Profile snapshot will contain Thread dump. Thanks, Bhavesh On Tue, Oct 14, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote: Bhavesh, It seems that all those threads are blocked on waiting for the lock on the dq for that partition. There got to be another thread holding the dq lock at that point. Could you create a jira and attach the full thread dump there? Also, could you attach the yourkit result that shows the breakdown of the time? Thanks, Jun On Tue, Oct 14, 2014 at 10:41 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Yes, it is reproducible quite easily. The problem is synchronized in RecordAccumulator. You can easy produce it. I have attached the Java code in my original email. Due to Application threads enqueue message into single partition is causing thrad contention and application thread may be blocked on this for more than a 2 minutes as shown in original email. Let me know if you need more information. Last Commit I tested with: commit 68b9f7716df1d994a9d43bec6bc42c90e66f1e99 Author: Anton Karamanov atara...@gmail.com Date: Tue Oct 7 18:22:31 2014 -0700 kafka-1644; Inherit FetchResponse from RequestOrResponse; patched by Anton Karamanov; reviewed by Jun Rao Thanks, Bhavesh On Tue, Oct 14, 2014 at 10:16 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, This sounds like a problem. Just to confirm this is after the fix for KAFKA-1673? https://issues.apache.org/jira/browse/KAFKA-1673 It sounds like you have a reproducible test case? -Jay On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback)
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173344#comment-14173344 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 4:40 AM: - [~jkreps] and [~junrao], I am not able to attached yourkit profiler snapshot. So I have uploaded to git hub https://github.com/bmistry13/kafka-trunk-producer/blob/master/TestNetworkDownProducer-2014-10-15-3.snapshot Let me know if you need more details. Thanks, Bhavesh was (Author: bmis13): I am not able to attached yourkit profiler snapshot. So I have uploaded to git hub https://github.com/bmistry13/kafka-trunk-producer/blob/master/TestNetworkDownProducer-2014-10-15-3.snapshot Thanks, Bhavesh [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173368#comment-14173368 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 4:54 AM: - Here is property file used for testing: {code} # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers=[list here] #Data Acks acks=0 # 64MB of Buffer for log lines (including all messages). buffer.memory=134217728 compression.type=snappy retries=3 # DEFAULT FROM THE KAFKA... # batch size = ((buffer.memory) / (number of partitions)) (so we can have in progress batch size created for each partition.). batch.size=1048576 #2MiB max.request.size=1048576 send.buffer.bytes=2097152 # We do not want to block the buffer Full so application thread will not be blocked but logs lines will be dropped... block.on.buffer.full=false #2MiB send.buffer.bytes=2097152 #wait... linger.ms=360 {code} was (Author: bmis13): Here is property file used for testing: {code} # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers=dare-msgq00.sv.walmartlabs.com:9092,dare-msgq01.sv.walmartlabs.com:9092,dare-msgq02.sv.walmartlabs.com:9092 #Data Acks acks=0 # 64MB of Buffer for log lines (including all messages). buffer.memory=134217728 compression.type=snappy retries=3 # DEFAULT FROM THE KAFKA... # batch size = ((buffer.memory) / (number of partitions)) (so we can have in progress batch size created for each partition.). batch.size=1048576 #2MiB max.request.size=1048576 send.buffer.bytes=2097152 # We do not want to block the buffer Full so application thread will not be blocked but logs lines will be dropped... block.on.buffer.full=false #2MiB send.buffer.bytes=2097152 #wait... linger.ms=360 {code} [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[],
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173368#comment-14173368 ] Bhavesh Mistry commented on KAFKA-1710: --- Here is property file used for testing: {code} # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers=dare-msgq00.sv.walmartlabs.com:9092,dare-msgq01.sv.walmartlabs.com:9092,dare-msgq02.sv.walmartlabs.com:9092 #Data Acks acks=0 # 64MB of Buffer for log lines (including all messages). buffer.memory=134217728 compression.type=snappy retries=3 # DEFAULT FROM THE KAFKA... # batch size = ((buffer.memory) / (number of partitions)) (so we can have in progress batch size created for each partition.). batch.size=1048576 #2MiB max.request.size=1048576 send.buffer.bytes=2097152 # We do not want to block the buffer Full so application thread will not be blocked but logs lines will be dropped... block.on.buffer.full=false #2MiB send.buffer.bytes=2097152 #wait... linger.ms=360 {code} [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)