Re: Review Request 26755: Patch for KAFKA-1706
On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: Joel, thanks a lot for the review! Some comments on your comments. On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 18 https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line18 I'm wondering if this is specific and nuanced enough to make it entirely private to MirrorMaker.scala OR if you think it is useful as a generic utility consider putting in org.apache.kafka.clients.common.utils My first thinking is that this could help provide better control on memory management in broader cases in addition to mirror maker, such as consumer side data chunk queue, and maybe also for controller message queue. On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 36 https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line36 This can be paraphrased to be simpler: An element can be enqueued provided the current size (in number of elements) is within the configured capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity. Ok, so while I was thinking through the above: is there any benefit to having a count-based capacity when you have a byte-based capacity? i.e., why not have byte-capacity only? I think there might be three cases where a queue size in number of message could help: 1. Because the size function is actually provided by user, if the message has a small payload with big overhead on other stuff, the size of bytes might not be working well. 2. The queue num size could be used to control the number of message buffered in the middle, i.e the failure boundry. For instance if one of the mirror maker bounced, if we buffered too many messages in mirror maker, we need to reconsume all of them again. 3. In cases where the byte limit only used to protect against of running out of memory, but users don't expect the queue to consume that much memory all the time. (I'm not sure if it is a valid use case though...) On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 82 https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line82 One significant caveat to this approach (and in the timed variant above) is that if a single large element needs to be enqueued it could potentially block a number of smaller elements from being enqueued. This may be okay in the case of mirror maker though but would make it less useful as a generic utility. I'm not sure why the big put could block small ones... It is possible that there is a super big item put into the queue and makes the queue to pass the byte limit by a lot. In that case, all the put will be blocked until a bunch of small messages are taken out of the queue. But it seems to be the purpose of having a byte limit for the queue. On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 100 https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line100 Can you clarify what this means? I was trying to say that the poll method does not contend lock with offer. I saw similar description from some queue's javadoc, it is probably better to remove it... On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 109 https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line109 getAndDecrement(sizeFunction.get(e)) It seems getAndDecrement() does not take argument and will always decrement by 1. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/#review58497 --- On Oct. 15, 2014, 4:28 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 15, 2014, 4:28 p.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description --- changed arguments name correct typo. Diffs - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1706: Attachment: KAFKA-1706_2014-10-26_23:47:31.patch Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 27, 2014, 6:47 a.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description --- changed arguments name correct typo. Diffs (updated) - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14184891#comment-14184891 ] Jiangjie Qin commented on KAFKA-1706: - Updated reviewboard https://reviews.apache.org/r/26755/diff/ against branch origin/trunk Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14184893#comment-14184893 ] Jiangjie Qin commented on KAFKA-1706: - Updated reviewboard https://reviews.apache.org/r/26755/diff/ against branch origin/trunk Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, KAFKA-1706_2014-10-26_23:50:07.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1706: Attachment: KAFKA-1706_2014-10-26_23:50:07.patch Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, KAFKA-1706_2014-10-26_23:50:07.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-260) Add audit trail to kafka
[ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185220#comment-14185220 ] Stas Levin commented on KAFKA-260: -- Hi guys, We've adopted the data model above in Aletheia (https://github.com/outbrain/Aletheia), an open source data delivery framework we've been working on here at Outbrain. In Aletheia we call these audit trails Breadcrumbs, and have them generated by the producer and consumer sides. We're working towards integrating the above mentioned patch in order to provide a client side dashboard. Aletheia is by no means meant to replace Kafka, it is rather an abstraction layer on top of Kafka and other messaging systems, as we point out in the wiki. Having audit capabilities built into Kafka would be really great, meanwhile, you're most welcome to check out Aletheia, perhaps you'll find it useful as it provides the Breadcrumb generation out of the box. -Stas Add audit trail to kafka Key: KAFKA-260 URL: https://issues.apache.org/jira/browse/KAFKA-260 Project: Kafka Issue Type: New Feature Affects Versions: 0.8.0 Reporter: Jay Kreps Assignee: Jay Kreps Attachments: Picture 18.png, kafka-audit-trail-draft.patch LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical tier through which data passes produce messages to a central audit-trail topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be producer, broker, hadoop-etl, etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers. This turns out to be extremely useful. We also have an application that balances the books and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any). This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data. Some details, the proposed format of the data is JSON using the following format for messages: { time:1301727060032, // the timestamp at which this audit message is sent topic: my_topic_name, // the topic this audit data is for tier:producer, // a user-defined tier name bucket_start: 130172640, // the beginning of the time bucket this data applies to bucket_end: 130172700, // the end of the time bucket this data applies to host:my_host_name.datacenter.linkedin.com, // the server that this was sent from datacenter:hlx32, // the datacenter this occurred in application:newsfeed_service, // a user-defined application name guid:51656274-a86a-4dff-b824-8e8e20a6348f, // a unique identifier for this message count:43634 } DISCUSSION Time is complex: 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events. 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable. For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication). Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though. This would integrate into the client (producer and consumer both) in the following way: 1. The user provides a way to get timestamps from messages (required) 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then
Re: Review Request 26885: Patch for KAFKA-1642
On Oct. 27, 2014, 12:13 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 122 https://reviews.apache.org/r/26885/diff/3/?file=731636#file731636line122 The comments When connecting or connected, this handles slow/stalled connections here are a bit misleading: after checking the code I realize connectionDelay is only triggered to detemine the delay in milis that we can re-check connectivity for node that is not connected, and hence if the node is connected again while we are determining its delay, we just set it to MAX. Instead of making it general to the KafkaClient interface, shall we just add this to the code block of line 155? It gets triggered any time NetworkClient.ready returns false for a node. The obvious case is that it will return not ready when disconnected, but it also does so when connecting or when connected but inFlightRequests.canSendMore() returns false (thus the mention of slow/stalled connections. The important thing is that the value returned *is* MAX_VALUE in those latter cases because neither one will be resolved by polling -- they both require an external event (connection established/failed or outstanding request receives a response) which should wake up the event loop when there's something to do. That keeps us from polling unnecessarily. Previously there were conditions in which connections in these states could trigger busy waiting of the poll loop. I don't think we can get the same effect just inlining the code because it uses state that's only available through ClusterConnectionStates, which is private to NetworkClient. The KafkaClient only exposes the higher level concept of ready. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review58575 --- On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Review Request 27232: Patch for KAFKA-559
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27232/ --- Review request for kafka. Bugs: KAFKA-559 https://issues.apache.org/jira/browse/KAFKA-559 Repository: kafka Description --- Addressing Joel's comments. Fix naming: entires - entries. Only remove partitions from a group if all partitions were last modified before the threshold date. Diffs - core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala PRE-CREATION Diff: https://reviews.apache.org/r/27232/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-559: Attachment: KAFKA-559.patch Garbage collect old consumer metadata entries - Key: KAFKA-559 URL: https://issues.apache.org/jira/browse/KAFKA-559 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Tejas Patil Labels: newbie, project Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch Many use cases involve tranient consumers. These consumers create entries under their consumer group in zk and maintain offsets there as well. There is currently no way to delete these entries. It would be good to have a tool that did something like bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] --zookeeper [zk_connect] This would scan through consumer group entries and delete any that had no offset update since the given date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-559: Assignee: Ewen Cheslack-Postava (was: Tejas Patil) Status: Patch Available (was: Open) Garbage collect old consumer metadata entries - Key: KAFKA-559 URL: https://issues.apache.org/jira/browse/KAFKA-559 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Ewen Cheslack-Postava Labels: newbie, project Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch Many use cases involve tranient consumers. These consumers create entries under their consumer group in zk and maintain offsets there as well. There is currently no way to delete these entries. It would be good to have a tool that did something like bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] --zookeeper [zk_connect] This would scan through consumer group entries and delete any that had no offset update since the given date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185377#comment-14185377 ] Ewen Cheslack-Postava commented on KAFKA-559: - Created reviewboard https://reviews.apache.org/r/27232/diff/ against branch origin/trunk Garbage collect old consumer metadata entries - Key: KAFKA-559 URL: https://issues.apache.org/jira/browse/KAFKA-559 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Tejas Patil Labels: newbie, project Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch Many use cases involve tranient consumers. These consumers create entries under their consumer group in zk and maintain offsets there as well. There is currently no way to delete these entries. It would be good to have a tool that did something like bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] --zookeeper [zk_connect] This would scan through consumer group entries and delete any that had no offset update since the given date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185380#comment-14185380 ] Ewen Cheslack-Postava commented on KAFKA-559: - This is an updated version of the patch by [~tejas.patil]. I'm pretty sure I've addressed all the issues [~jjkoshy] brought up. Garbage collect old consumer metadata entries - Key: KAFKA-559 URL: https://issues.apache.org/jira/browse/KAFKA-559 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Ewen Cheslack-Postava Labels: newbie, project Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch Many use cases involve tranient consumers. These consumers create entries under their consumer group in zk and maintain offsets there as well. There is currently no way to delete these entries. It would be good to have a tool that did something like bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] --zookeeper [zk_connect] This would scan through consumer group entries and delete any that had no offset update since the given date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'
Ewen Cheslack-Postava created KAFKA-1732: Summary: DumpLogSegments tool fails when path has a '.' Key: KAFKA-1732 URL: https://issues.apache.org/jira/browse/KAFKA-1732 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Priority: Minor Using DumpLogSegments in a directory that has a '.' that isn't part of the file extension causes an exception: {code} 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index --verify-index-only Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at kafka.utils.Utils$.openChannel(Utils.scala:162) at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.
Marc Chung created KAFKA-1733: - Summary: Producer.send will block indeterminately when broker is unavailable. Key: KAFKA-1733 URL: https://issues.apache.org/jira/browse/KAFKA-1733 Project: Kafka Issue Type: Bug Components: core, producer Reporter: Marc Chung Assignee: Jun Rao This is a follow up to the conversation here: https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E During ClientUtils.fetchTopicMetadata, if the broker is unavailable, socket.connect will block indeterminately. Any retry policy (message.send.max.retries) further increases the time spent waiting for the socket to connect. The root fix is to add a connection timeout value to the BlockingChannel's socket configuration, like so: {noformat} -channel.socket.connect(new InetSocketAddress(host, port)) +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) {noformat} The simplest thing to do here would be to have a constant, default value that would be applied to every socket configuration. Is that acceptable? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.
[ https://issues.apache.org/jira/browse/KAFKA-1733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185539#comment-14185539 ] Marc Chung commented on KAFKA-1733: --- I have a patch (work in progress) here: https://github.com/mchung/kafka/commit/87b8ddbfe23dc887f56fa6f9ea3669733933c49b Producer.send will block indeterminately when broker is unavailable. Key: KAFKA-1733 URL: https://issues.apache.org/jira/browse/KAFKA-1733 Project: Kafka Issue Type: Bug Components: core, producer Reporter: Marc Chung Assignee: Jun Rao This is a follow up to the conversation here: https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E During ClientUtils.fetchTopicMetadata, if the broker is unavailable, socket.connect will block indeterminately. Any retry policy (message.send.max.retries) further increases the time spent waiting for the socket to connect. The root fix is to add a connection timeout value to the BlockingChannel's socket configuration, like so: {noformat} -channel.socket.connect(new InetSocketAddress(host, port)) +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) {noformat} The simplest thing to do here would be to have a constant, default value that would be applied to every socket configuration. Is that acceptable? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27238: Patch for KAFKA-1732
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27238/ --- Review request for kafka. Bugs: KAFKA-1732 https://issues.apache.org/jira/browse/KAFKA-1732 Repository: kafka Description --- KAFKA-1732 Handle paths with '.' properly in DumpLogSegments. Diffs - core/src/main/scala/kafka/tools/DumpLogSegments.scala 8e9d47b8d4adc5754ed8861aa04ddd3c6b629e3d Diff: https://reviews.apache.org/r/27238/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'
[ https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185586#comment-14185586 ] Ewen Cheslack-Postava commented on KAFKA-1732: -- Created reviewboard https://reviews.apache.org/r/27238/diff/ against branch origin/trunk DumpLogSegments tool fails when path has a '.' -- Key: KAFKA-1732 URL: https://issues.apache.org/jira/browse/KAFKA-1732 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1732.patch Using DumpLogSegments in a directory that has a '.' that isn't part of the file extension causes an exception: {code} 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index --verify-index-only Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at kafka.utils.Utils$.openChannel(Utils.scala:162) at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'
[ https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1732: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) DumpLogSegments tool fails when path has a '.' -- Key: KAFKA-1732 URL: https://issues.apache.org/jira/browse/KAFKA-1732 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1732.patch Using DumpLogSegments in a directory that has a '.' that isn't part of the file extension causes an exception: {code} 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index --verify-index-only Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at kafka.utils.Utils$.openChannel(Utils.scala:162) at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'
[ https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1732: - Attachment: KAFKA-1732.patch DumpLogSegments tool fails when path has a '.' -- Key: KAFKA-1732 URL: https://issues.apache.org/jira/browse/KAFKA-1732 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1732.patch Using DumpLogSegments in a directory that has a '.' that isn't part of the file extension causes an exception: {code} 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index --verify-index-only Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at kafka.utils.Utils$.openChannel(Utils.scala:162) at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Kafka Security?
List Users, Does anyone know when/if Kafka security features are being planned? I haven't seen much on the net outside of the following proposal: https://cwiki.apache.org/confluence/display/KAFKA/Security. Thanks! john
[jira] [Created] (KAFKA-1734) System test metric plotting nonexistent file warnings
Andrew Olson created KAFKA-1734: --- Summary: System test metric plotting nonexistent file warnings Key: KAFKA-1734 URL: https://issues.apache.org/jira/browse/KAFKA-1734 Project: Kafka Issue Type: Bug Reporter: Andrew Olson Priority: Minor Running the system tests (trunk code), there are many The file ... does not exist for plotting (metrics) warning messages, for example, {noformat} 2014-10-27 14:47:58,478 - WARNING - The file /opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/kafka.network.RequestMetrics.Produce-RemoteTimeMs.csv does not exist for plotting (metrics) {noformat} Looks like the generated metric file names only include the last part of the metric, e.g. Produce-RemoteTimeMs.csv not kafka.network.RequestMetrics.Produce-RemoteTimeMs.csv. {noformat} $ ls /opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/*Produce* /opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/Produce-RemoteTimeMs.csv {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka Security?
This is very much work in progress. You can follow the Jira here to see how it goes: https://issues.apache.org/jira/browse/KAFKA-1682 On Mon, Oct 27, 2014 at 11:49 AM, Stephenson, John L john.l.stephen...@lmco.com wrote: List Users, Does anyone know when/if Kafka security features are being planned? I haven't seen much on the net outside of the following proposal: https://cwiki.apache.org/confluence/display/KAFKA/Security. Thanks! john
[jira] [Updated] (KAFKA-1731) add config/jmx changes in 0.8.2 doc
[ https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1731: --- Fix Version/s: 0.8.2 Assignee: Jun Rao I made a pass on the site doc to add the new broker side configs (offset management related configs will be added in kafka-1729) and the important jmxs. This is already committed to svn. I will leave this ticket open for a few more days for comments. add config/jmx changes in 0.8.2 doc --- Key: KAFKA-1731 URL: https://issues.apache.org/jira/browse/KAFKA-1731 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1731) add config/jmx changes in 0.8.2 doc
[ https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185818#comment-14185818 ] Gwen Shapira commented on KAFKA-1731: - Any chance you can upload a patch so we can see what changed? add config/jmx changes in 0.8.2 doc --- Key: KAFKA-1731 URL: https://issues.apache.org/jira/browse/KAFKA-1731 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1731) add config/jmx changes in 0.8.2 doc
[ https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1731: --- Attachment: config-jmx_082.patch Attached please find the patch. add config/jmx changes in 0.8.2 doc --- Key: KAFKA-1731 URL: https://issues.apache.org/jira/browse/KAFKA-1731 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.2 Attachments: config-jmx_082.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.
[ https://issues.apache.org/jira/browse/KAFKA-1733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1733: - Reviewer: Jun Rao Assignee: (was: Jun Rao) Producer.send will block indeterminately when broker is unavailable. Key: KAFKA-1733 URL: https://issues.apache.org/jira/browse/KAFKA-1733 Project: Kafka Issue Type: Bug Components: core, producer Reporter: Marc Chung This is a follow up to the conversation here: https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E During ClientUtils.fetchTopicMetadata, if the broker is unavailable, socket.connect will block indeterminately. Any retry policy (message.send.max.retries) further increases the time spent waiting for the socket to connect. The root fix is to add a connection timeout value to the BlockingChannel's socket configuration, like so: {noformat} -channel.socket.connect(new InetSocketAddress(host, port)) +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) {noformat} The simplest thing to do here would be to have a constant, default value that would be applied to every socket configuration. Is that acceptable? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27238: Patch for KAFKA-1732
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27238/#review58723 --- Ship it! Ship It! - Neha Narkhede On Oct. 27, 2014, 6:41 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27238/ --- (Updated Oct. 27, 2014, 6:41 p.m.) Review request for kafka. Bugs: KAFKA-1732 https://issues.apache.org/jira/browse/KAFKA-1732 Repository: kafka Description --- KAFKA-1732 Handle paths with '.' properly in DumpLogSegments. Diffs - core/src/main/scala/kafka/tools/DumpLogSegments.scala 8e9d47b8d4adc5754ed8861aa04ddd3c6b629e3d Diff: https://reviews.apache.org/r/27238/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'
[ https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1732: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. Pushed to trunk and 0.8.2 DumpLogSegments tool fails when path has a '.' -- Key: KAFKA-1732 URL: https://issues.apache.org/jira/browse/KAFKA-1732 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1732.patch Using DumpLogSegments in a directory that has a '.' that isn't part of the file extension causes an exception: {code} 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index --verify-index-only Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at kafka.utils.Utils$.openChannel(Utils.scala:162) at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26755: Patch for KAFKA-1706
On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 109 https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line109 getAndDecrement(sizeFunction.get(e)) Jiangjie Qin wrote: It seems getAndDecrement() does not take argument and will always decrement by 1. ah yes you are right On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 82 https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line82 One significant caveat to this approach (and in the timed variant above) is that if a single large element needs to be enqueued it could potentially block a number of smaller elements from being enqueued. This may be okay in the case of mirror maker though but would make it less useful as a generic utility. Jiangjie Qin wrote: I'm not sure why the big put could block small ones... It is possible that there is a super big item put into the queue and makes the queue to pass the byte limit by a lot. In that case, all the put will be blocked until a bunch of small messages are taken out of the queue. But it seems to be the purpose of having a byte limit for the queue. I looked again. Yes you are right. It should not block smaller puts. Now I'm going to ask the question from the other side of the table: since you are just notifying waiting threads, it is possible for a large put to get starved if there are a lot of smaller puts that get notified earlier. To the best of my knowledge the JVM does not guarantee fairness in unblocking multiple contending threads. Ideally there should be some notion of maximum wait before a put attempt takes priority over others. i.e., these are nuances that may be a compelling reason to make it a specialized utility within MirrorMaker itself since it is not general enough (yet). - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/#review58497 --- On Oct. 27, 2014, 6:50 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 27, 2014, 6:50 a.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description --- changed arguments name correct typo. Incorporated Joel's comments. Also fixed negative queue size problem. Diffs - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
[DISCUSSION] Nested compression in Kafka?
Hello folks, I came across this testComplexCompressDecompress in kafka.message.MessageCompressionTest while I'm working some consumer decompression optimization. This test checks if nested compression is supported. I remember vaguely that some time ago we decide not to support nested compression at Kafka, and in the new producer's MemoryRecords I also make this assumption in this iterator implementation. Is that still the case? If yes shall we remove this test case? -- Guozhang
[jira] [Created] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream
Guozhang Wang created KAFKA-1735: Summary: MemoryRecords.Iterator needs to handle partial reads from compressed stream Key: KAFKA-1735 URL: https://issues.apache.org/jira/browse/KAFKA-1735 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Found a bug in the MemoryRecords.Iterator implementation, where {code} stream.read(recordBuffer, 0, size) {code} can read less than size'ed bytes, and rest of the recordBuffer would set to \0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27256: Fix KAFKA-1735
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/ --- Review request for kafka. Bugs: KAFKA-1735 https://issues.apache.org/jira/browse/KAFKA-1735 Repository: kafka Description --- Handle partial reads from compressed stream Diffs - clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca Diff: https://reviews.apache.org/r/27256/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream
[ https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1735: - Status: Patch Available (was: Open) MemoryRecords.Iterator needs to handle partial reads from compressed stream --- Key: KAFKA-1735 URL: https://issues.apache.org/jira/browse/KAFKA-1735 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1735.patch Found a bug in the MemoryRecords.Iterator implementation, where {code} stream.read(recordBuffer, 0, size) {code} can read less than size'ed bytes, and rest of the recordBuffer would set to \0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream
[ https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1735: - Attachment: KAFKA-1735.patch MemoryRecords.Iterator needs to handle partial reads from compressed stream --- Key: KAFKA-1735 URL: https://issues.apache.org/jira/browse/KAFKA-1735 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1735.patch Found a bug in the MemoryRecords.Iterator implementation, where {code} stream.read(recordBuffer, 0, size) {code} can read less than size'ed bytes, and rest of the recordBuffer would set to \0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream
[ https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186072#comment-14186072 ] Guozhang Wang commented on KAFKA-1735: -- Created reviewboard https://reviews.apache.org/r/27256/diff/ against branch origin/trunk MemoryRecords.Iterator needs to handle partial reads from compressed stream --- Key: KAFKA-1735 URL: https://issues.apache.org/jira/browse/KAFKA-1735 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1735.patch Found a bug in the MemoryRecords.Iterator implementation, where {code} stream.read(recordBuffer, 0, size) {code} can read less than size'ed bytes, and rest of the recordBuffer would set to \0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186104#comment-14186104 ] Jiangjie Qin commented on KAFKA-1647: - Updated reviewboard https://reviews.apache.org/r/26373/diff/ against branch origin/trunk Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: Jiangjie Qin Priority: Critical Labels: newbie++ Fix For: 0.8.2 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, KAFKA-1647_2014-10-18_00:26:51.patch, KAFKA-1647_2014-10-21_23:08:43.patch, KAFKA-1647_2014-10-27_17:19:07.patch We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: p0: leader = b0, ISR = {b0, b1} p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make itself (b1) the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b0 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1647: Attachment: KAFKA-1647_2014-10-27_17:19:07.patch Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: Jiangjie Qin Priority: Critical Labels: newbie++ Fix For: 0.8.2 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, KAFKA-1647_2014-10-18_00:26:51.patch, KAFKA-1647_2014-10-21_23:08:43.patch, KAFKA-1647_2014-10-27_17:19:07.patch We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: p0: leader = b0, ISR = {b0, b1} p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make itself (b1) the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b0 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 28, 2014, 12:20 a.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. Addressed Jun's comments. Will do tests to verify if it works. Addressed Joel's comments, we do not need to check the if leader exits for not when adding fetcher. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing (updated) --- Followed Joel's testing step. I was able to reproduce the problem without the patch and the WARN message goes away after applied the patch. Thanks, Jiangjie Qin
Jenkins build is back to normal : Kafka-trunk #319
See https://builds.apache.org/job/Kafka-trunk/319/changes
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186124#comment-14186124 ] Jun Rao commented on KAFKA-1481: Vladimir, Thanks for the patch. Really appreciate your help. I realized that this is one of the biggest technical debt that we have accumulated over time. So, it may take some time to sort this out. So, bear with me. Some more comments. 30. About Taggable, I still have mixed feelings. I can see why you created it. However, my reasoning is that for a lot of the case classes (ClientIdTopic, CliendIdAndBroker) that we create, it's weird that some of them are taggable and some of them are not, depending whether they are used for tagging metric names or not. Those classes have no direct relationships with the metrics. Similarly, we only need to be aware of tags when creating metrics. Also, because of this, we change the constructor of SimpleConsumer. Since this is an API change, we should really try to avoid it. My feeling is that it's probably simpler if we just create regular case classes as before and generate metric tags explicitly when we create the metric. For example, in AbstractFetcherThread, we can do class FetcherStats(clientIdAndBroker: ClientIdAndBroker) extends KafkaMetricsGroup { val requestRate = newMeter(RequestsPerSec, requests, TimeUnit.SECONDS, Map(cliendId - clientIdAndBroker.clientId, brokerHost - clientIdAndBroker.host, brokerPort - clientIdAndBroker.port)) and just have ClientIdAndBroker be the following case class. case class ClientIdAndBroker(clientId: String, host: String, port: Int) This way, the code is a bit cleaner since all the metric tag related stuff are isolated to those places when the metrics are created. So, I'd suggest that we remove Taggable. 31. AbstractFetcherThread: 31.1 You changed the meaning of clientId. clientId is used in the fetch request and we want to leave it as just the clientId string. Since the clientId should be uniquely representing a particular consumer client, we just need to include the clientId in the metric name. We don't need to include the consumer id in either the fetch request or the metric name since it's too long and has redundant info. 31.2 FetcherLagStats: This is an existing problem. FetcherLagMetrics shouldn't be keyed off ClientIdBrokerTopicPartition. It should be keyed off ClientIdTopicPartition. This way, the metric name remains the same independent of the current leader of those partitions. 32. ZookeeperConsumerConnector: 32.1 FetchQueueSize: I agree that the metric name just needs to be tagged with clientId, topic and threadId. We don't need to include the consumerId since it's too long (note that topicThread._2 includes both the consumerId and the threadId). 33. KafkaMetricsGroup: Duplicate entries. // kafka.consumer.ConsumerTopicStats -- kafka.consumer.{ConsumerIterator, PartitionTopicInfo} explicitMetricName(kafka.consumer, ConsumerTopicMetrics, MessagesPerSec), explicitMetricName(kafka.consumer, ConsumerTopicMetrics, MessagesPerSec), // kafka.consumer.ConsumerTopicStats explicitMetricName(kafka.consumer, ConsumerTopicMetrics, BytesPerSec), explicitMetricName(kafka.consumer, ConsumerTopicMetrics, BytesPerSec), // kafka.consumer.FetchRequestAndResponseStats -- kafka.consumer.SimpleConsumer explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, FetchResponseSize), explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, FetchRequestRateAndTimeMs), explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, FetchResponseSize), explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, FetchRequestRateAndTimeMs), /** * ProducerRequestStats -- SyncProducer * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed. */ explicitMetricName(kafka.producer, ProducerRequestMetrics, ProducerRequestRateAndTimeMs), explicitMetricName(kafka.producer, ProducerRequestMetrics, ProducerRequestSize), explicitMetricName(kafka.producer, ProducerRequestMetrics, ProducerRequestRateAndTimeMs), explicitMetricName(kafka.producer, ProducerRequestMetrics, ProducerRequestSize) 34. AbstractFetcherManager: Could you put the followings in 2 separate lines? Similar things happen in a few other files. Perhaps you need to change the formatting in your IDE? }, metricPrefix.toTags private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers Stop using dashes AND underscores as separators in MBean names -- Key:
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186130#comment-14186130 ] Jay Kreps commented on KAFKA-1501: -- Nice, so statistically it is 93% likely to be fixed, then! So since this changes the socket server default is this the right thing to do? Could this have any negative side effects in production? I actually don't really understand the effect of this option or why lack of it was causing the failure. Could you explain? transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch, KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
The Apache Kafka community is pleased to announce the beta release for Apache Kafka 0.8.2. The 0.8.2-beta release introduces many new features, improvements and fixes including: - A new Java producer for ease of implementation and enhanced performance. - Delete topic support. - Per topic configuration of preference for consistency over availability. - Scala 2.11 support and dropping support for Scala 2.8. - LZ4 Compression. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/
Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
I actually don't see the beta release on that download page: http://kafka.apache.org/downloads.html -Jay On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote: The Apache Kafka community is pleased to announce the beta release for Apache Kafka 0.8.2. The 0.8.2-beta release introduces many new features, improvements and fixes including: - A new Java producer for ease of implementation and enhanced performance. - Delete topic support. - Per topic configuration of preference for consistency over availability. - Scala 2.11 support and dropping support for Scala 2.8. - LZ4 Compression. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/
Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
Strange. I'm seeing it. Browser cache? On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps jay.kr...@gmail.com wrote: I actually don't see the beta release on that download page: http://kafka.apache.org/downloads.html -Jay On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote: The Apache Kafka community is pleased to announce the beta release for Apache Kafka 0.8.2. The 0.8.2-beta release introduces many new features, improvements and fixes including: - A new Java producer for ease of implementation and enhanced performance. - Delete topic support. - Per topic configuration of preference for consistency over availability. - Scala 2.11 support and dropping support for Scala 2.8. - LZ4 Compression. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/
Re: Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/#review58725 --- Another thing I forgot to mention in the earlier review: we definitely should have a unit test for this. You will need to allow passing in the Time interface and use MockTime in the test. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/26755/#comment99850 Unused core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/26755/#comment99860 if core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/26755/#comment99864 no need the return you can add on line 63: else { false } (and remove the false at the very end) Equivalent, but a little cleaner to look at core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/26755/#comment99865 Again, this is obviously stylistic, but in small methods like this there is little need to return from the middle. Can you restructure it to something like: if (...) false else { ... success } core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/26755/#comment99866 Same here - Joel Koshy On Oct. 27, 2014, 6:50 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 27, 2014, 6:50 a.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description --- changed arguments name correct typo. Incorporated Joel's comments. Also fixed negative queue size problem. Diffs - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
Yeah it must be a caching thing because others in the same office do see it (but not all). And ctrl-shift-r doesn't seem to help. Nevermind :-) -Jay On Mon, Oct 27, 2014 at 6:00 PM, Gwen Shapira gshap...@cloudera.com wrote: Strange. I'm seeing it. Browser cache? On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps jay.kr...@gmail.com wrote: I actually don't see the beta release on that download page: http://kafka.apache.org/downloads.html -Jay On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote: The Apache Kafka community is pleased to announce the beta release for Apache Kafka 0.8.2. The 0.8.2-beta release introduces many new features, improvements and fixes including: - A new Java producer for ease of implementation and enhanced performance. - Delete topic support. - Per topic configuration of preference for consistency over availability. - Scala 2.11 support and dropping support for Scala 2.8. - LZ4 Compression. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/
Re: Review Request 27256: Fix KAFKA-1735
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/#review58744 --- clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java https://reviews.apache.org/r/27256/#comment99882 Would it be possible to add a unit test for this? - Neha Narkhede On Oct. 27, 2014, 11:59 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/ --- (Updated Oct. 27, 2014, 11:59 p.m.) Review request for kafka. Bugs: KAFKA-1735 https://issues.apache.org/jira/browse/KAFKA-1735 Repository: kafka Description --- Handle partial reads from compressed stream Diffs - clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca Diff: https://reviews.apache.org/r/27256/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1731) add config/jmx changes in 0.8.2 doc
[ https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186179#comment-14186179 ] Gwen Shapira commented on KAFKA-1731: - Thanks :) No comments, it looks good. add config/jmx changes in 0.8.2 doc --- Key: KAFKA-1731 URL: https://issues.apache.org/jira/browse/KAFKA-1731 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.2 Attachments: config-jmx_082.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
Joe, Thanks for driving the release. Jun On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote: The Apache Kafka community is pleased to announce the beta release for Apache Kafka 0.8.2. The 0.8.2-beta release introduces many new features, improvements and fixes including: - A new Java producer for ease of implementation and enhanced performance. - Delete topic support. - Per topic configuration of preference for consistency over availability. - Scala 2.11 support and dropping support for Scala 2.8. - LZ4 Compression. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review58748 --- Ship it! Looks good to me. Can you make these final edits and upload another RB? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/26373/#comment99891 The .format needs to be on this line. Can you fix it and upload a new patch? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/26373/#comment99892 Small edits: // Create the local replica even if the leader is unavailable. This is required to ensure that we include the partition's high watermark in the checkpoint file (see KAFKA-1647) Also, I'm not sure if we need to explicitly reference the jira in comments since people can just git annotate. - Joel Koshy On Oct. 28, 2014, 12:20 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 28, 2014, 12:20 a.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. Addressed Jun's comments. Will do tests to verify if it works. Addressed Joel's comments, we do not need to check the if leader exits for not when adding fetcher. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Followed Joel's testing step. I was able to reproduce the problem without the patch and the WARN message goes away after applied the patch. Thanks, Jiangjie Qin
Re: Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 28, 2014, 1:34 a.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description (updated) --- changed arguments name correct typo. Incorporated Joel's comments. Also fixed negative queue size problem. Incorporated Joel's comments. Diffs (updated) - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
RE: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
Congrats! When do you think the final 0.82 will be released? To: annou...@apache.org; us...@kafka.apache.org; dev@kafka.apache.org Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released Date: Tue, 28 Oct 2014 00:50:35 + From: joest...@apache.org The Apache Kafka community is pleased to announce the beta release for Apache Kafka 0.8.2. The 0.8.2-beta release introduces many new features, improvements and fixes including: - A new Java producer for ease of implementation and enhanced performance. - Delete topic support. - Per topic configuration of preference for consistency over availability. - Scala 2.11 support and dropping support for Scala 2.8. - LZ4 Compression. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/
[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'
[ https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1732: - Fix Version/s: 0.8.2 DumpLogSegments tool fails when path has a '.' -- Key: KAFKA-1732 URL: https://issues.apache.org/jira/browse/KAFKA-1732 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor Fix For: 0.8.2 Attachments: KAFKA-1732.patch Using DumpLogSegments in a directory that has a '.' that isn't part of the file extension causes an exception: {code} 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index --verify-index-only Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at kafka.utils.Utils$.openChannel(Utils.scala:162) at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'
[ https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186334#comment-14186334 ] Neha Narkhede commented on KAFKA-1732: -- Thanks [~charmalloc]. Missed updating the version myself. DumpLogSegments tool fails when path has a '.' -- Key: KAFKA-1732 URL: https://issues.apache.org/jira/browse/KAFKA-1732 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor Fix For: 0.8.2 Attachments: KAFKA-1732.patch Using DumpLogSegments in a directory that has a '.' that isn't part of the file extension causes an exception: {code} 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index --verify-index-only Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at kafka.utils.Utils$.openChannel(Utils.scala:162) at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186350#comment-14186350 ] Bhavesh Mistry commented on KAFKA-1710: --- [~jkreps], I understand the current code base is adding bytes to shared memory and doing compression (on application thread). The older consumer seems to do all this in back-ground thread. So What changed to have this in fore-ground ? Also, if you had to re-engineer this code, How would you re-engineer to remove Synchronization and move everything in background so more runable state is give to Application Thread and cost of enqueue will very less. I am really interested in solving this problem for my application. So I just wanted to know your suggestions/ideas, how would you solve this ? Thanks for all your help so far !! Thanks, Bhavesh [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186350#comment-14186350 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/28/14 4:40 AM: - [~jkreps], I understand the current code base is adding bytes to shared memory and doing compression (on application thread). The older consumer seems to do all this in back-ground thread. So What changed to have this in fore-ground ? Also, if you had to re-engineer this code, How would you re-engineer to remove Synchronization and move everything in background so more runable state is give to Application Thread and cost of enqueue will very less. (Of Course at cost of memory). I am really interested in solving this problem for my application. So I just wanted to know your suggestions/ideas, how would you solve this ? Thanks for all your help so far !! Thanks, Bhavesh was (Author: bmis13): [~jkreps], I understand the current code base is adding bytes to shared memory and doing compression (on application thread). The older consumer seems to do all this in back-ground thread. So What changed to have this in fore-ground ? Also, if you had to re-engineer this code, How would you re-engineer to remove Synchronization and move everything in background so more runable state is give to Application Thread and cost of enqueue will very less. I am really interested in solving this problem for my application. So I just wanted to know your suggestions/ideas, how would you solve this ? Thanks for all your help so far !! Thanks, Bhavesh [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186350#comment-14186350 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/28/14 4:58 AM: - [~jkreps], I understand the current code base is adding bytes to shared memory and doing compression (on application thread). The older consumer seems to do all this in back-ground thread. So What changed to have this in fore-ground ? Also, if you had to re-engineer this code, How would you re-engineer to remove Synchronization and move everything in background so more runable state is give to Application Thread and cost of enqueue will very less. (Of Course at cost of memory). I am really interested in solving this problem for my application. So I just wanted to know your suggestions/ideas, how would you solve this ? Thanks for all your help so far !!Only think I can think of is do *AsynKafkaProducer* as mentioned in previous comments where [~ewencp] mentioned that problem will be those threads that are enqueue message at cost of memory, thread context switching etc... Thanks, Bhavesh was (Author: bmis13): [~jkreps], I understand the current code base is adding bytes to shared memory and doing compression (on application thread). The older consumer seems to do all this in back-ground thread. So What changed to have this in fore-ground ? Also, if you had to re-engineer this code, How would you re-engineer to remove Synchronization and move everything in background so more runable state is give to Application Thread and cost of enqueue will very less. (Of Course at cost of memory). I am really interested in solving this problem for my application. So I just wanted to know your suggestions/ideas, how would you solve this ? Thanks for all your help so far !! Thanks, Bhavesh [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139