Re: [VOTE] 0.10.2.1 RC1
+1 (non-binding) Built sources, ran all unit and integration tests, checked new documentation, esp with an eye on the streams library. Thanks Gwen Eno > On 12 Apr 2017, at 17:25, Gwen Shapirawrote: > > Hello Kafka users, developers, client-developers, friends, romans, > citizens, etc, > > This is the second candidate for release of Apache Kafka 0.10.2.1. > > This is a bug fix release and it includes fixes and improvements from 24 JIRAs > (including a few critical bugs). > > Release notes for the 0.10.2.1 release: > http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/RELEASE_NOTES.html > > *** Please download, test and vote by Monday, April 17, 5:30 pm PT > > Kafka's KEYS file containing PGP keys we use to sign the release: > http://kafka.apache.org/KEYS > > Your help in validating this bugfix release is super valuable, so > please take the time to test and vote! > > Suggested tests: > * Grab the source archive and make sure it compiles > * Grab one of the binary distros and run the quickstarts against them > * Extract and verify one of the site docs jars > * Build a sample against jars in the staging repo > * Validate GPG signatures on at least one file > * Validate the javadocs look ok > * The 0.10.2 documentation was updated for this bugfix release > (especially upgrade, streams and connect portions) - please make sure > it looks ok: http://kafka.apache.org/documentation.html > > * Release artifacts to be voted upon (source and binary): > http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/ > > * Maven artifacts to be voted upon: > https://repository.apache.org/content/groups/staging/ > > * Javadoc: > http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/javadoc/ > > * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag: > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=e133f2ca57670e77f8114cc72dbc2f91a48e3a3b > > * Documentation: > http://kafka.apache.org/0102/documentation.html > > * Protocol: > http://kafka.apache.org/0102/protocol.html > > /** > > Thanks, > > Gwen Shapira
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968209#comment-15968209 ] Jun Rao commented on KAFKA-5062: One far fetched scenario is the following. A bad application sent the broker some data that happen to match a produce request. The data records in the produce request may appear to be compressed. The broker will try to decompress the data records, which could lead to the creation of an arbitrary byte array. One improvement that we could make is to tighten up the parsing of a request on the server side. Currently, Schema.read() could succeed if a struct can be completely constructed without using all bytes in the input byte buffer. It's probably safer to throw an exception when the struct is constructed but there are remaining bytes in the buffer. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[DISCUSS] ACL operations
Hi all, KIP-4 described some RPCs for implementing centralized administrative operations for Kafka. Now that the adminclient work is going forward, I'd like to re-open the discussion about the ACL-related RPCs. This is a continuation of the email thread Grant Henke started while back. (See http://search-hadoop.com/m/Kafka/uyzND18EGG22cFMXg?subj=+DISCUSS+KIP+4+ACL+Admin+Schema ) I think the idea of sending a batch of ACL-related operations all at once is good for efficiency. However, I wonder if it is simpler to separate the add and remove ACLs operations, or if we really ought to combine them into one RP It seems that when both add and remove operations are combined into one RPC, there are some thorny questions about ordering (does a delete ACL operation on a topic happen first, or an add ACL operation?) best, Colin
[jira] [Commented] (KAFKA-4346) Add foreachValue method to KStream
[ https://issues.apache.org/jira/browse/KAFKA-4346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968177#comment-15968177 ] ASF GitHub Bot commented on KAFKA-4346: --- Github user xvrl closed the pull request at: https://github.com/apache/kafka/pull/2063 > Add foreachValue method to KStream > -- > > Key: KAFKA-4346 > URL: https://issues.apache.org/jira/browse/KAFKA-4346 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Minor > Labels: needs-kip, newbie > > This would be the value-only counterpart to foreach, similar to mapValues. > Adding this method would enhance readability and allow for Java 8 syntactic > sugar using method references without having to wrap existing methods that > only operate on the value type. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] kafka pull request #2063: KAFKA-4346: Add foreachValue method to KStream
Github user xvrl closed the pull request at: https://github.com/apache/kafka/pull/2063 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] kafka pull request #2849: KAFKA-5059: Implement Transactional Coordinator
GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/2849 KAFKA-5059: Implement Transactional Coordinator You can merge this pull request into a Git repository by running: $ git pull https://github.com/confluentinc/kafka exactly-once-tc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2849.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2849 commit 4d17b7c96293ca8f9735049070512be9707aba27 Author: Guozhang WangDate: 2017-03-02T01:42:49Z Transaction log message format (#134) * add transaction log message format * add transaction timeout to initPid request * collapse to one message type commit af926510d2fd455a0ea4e82da83e10cde65db4e9 Author: Apurva Mehta Date: 2017-03-15T20:47:25Z Fix build and test errors due to reabse onto idempotent-producer branch commit fc3544bf6b55c48d487ef2b7877280d3ac90debb Author: Guozhang Wang Date: 2017-03-17T05:40:49Z Transaction log partition Immigration and Emigration (#142) * sub-package transaction and group classes within coordinator * add loading and cleaning up logic * add transaction configs commit fc5fe9226dd4374018f6b5fe3c182158530af193 Author: Guozhang Wang Date: 2017-03-21T04:38:35Z Add transactions broker configs (#146) * add all broker-side configs * check for transaction timeout value * added one more exception type commit ef390df0eacc8d1f32f96b2db792326a053a5db1 Author: Guozhang Wang Date: 2017-03-31T22:20:05Z Handle addPartitions and addOffsets on TC (#147) * handling add offsets to txn * add a pending state with prepareTransition / completeTransaction / abortTransition of state * refactor handling logic for multiple in-flight requests commit 2a6526a861546eb4102b900d1da703fd2914bd43 Author: Apurva Mehta Date: 2017-04-07T19:49:19Z Fix build errors after rebase onto trunk and dropping out the request stubs and client changes. commit 4d18bb178cd48364bf610e615b176ad8f0d8385f Author: Apurva Mehta Date: 2017-04-03T21:17:25Z Fix test errors after rebase: 1. Notable conflicts are with the small API changes to DelayedOperation and the newly introduced purgeDataBefore PR. 2. Jason's update to support streaming decompression required a bit of an overhaul to the way we handle aborted transactions on the consumer. commit f639b962e8ba618baaef47611e21e2b85b5e5725 Author: Guozhang Wang Date: 2017-03-24T22:42:53Z fix unit tests commit 853c5e8abffdb723c6f6b818fdeeab94da8667ed Author: Guozhang Wang Date: 2017-03-24T22:52:37Z add sender thread commit 879c01c3b5b305485cfd26cb8ceedf453b984067 Author: Guozhang Wang Date: 2017-03-28T01:04:53Z rename TC Send Thread to general inter-broker send thread commit 239e7f733f8b814ca2d966a80359d8d0de5dee50 Author: Guozhang Wang Date: 2017-03-29T21:58:45Z add tc channel manager commit b1561da6e2893fad7bcfacba76db4e4df6414577 Author: Guozhang Wang Date: 2017-03-29T21:59:26Z missing files commit 62685c7269fc648a2401fc7a71f31b9536d7c08a Author: Guozhang Wang Date: 2017-03-31T22:15:37Z add the txn marker channel manager commit 298790154c9bfe46f8e4a6b2e0372297fb19896a Author: Damian Guy Date: 2017-04-05T16:09:27Z fix compilation errors commit 4f5c23d051453d27f3179a442fe3d822b77d4e12 Author: Damian Guy Date: 2017-04-10T10:58:43Z integrate EndTxnRequest commit e5f25f31e85fd8104c3df8f8195ccb60694610bc Author: Damian Guy Date: 2017-04-10T13:43:40Z add test fo InterBrokerSendThread. Refactor to use delegation rather than inheritance commit 8bbd7a07be28585cd329a1fc769fcc340f866af2 Author: Damian Guy Date: 2017-04-10T16:24:24Z refactor TransactionMarkerChannelManager. Add some test commit 195bccf8c3945696e6e15cc093072ba83e706eec Author: Damian Guy Date: 2017-04-10T18:25:57Z more tests commit c28eb5a0b339cce023e278d7eafcf3e8a98fa8e2 Author: Damian Guy Date: 2017-04-11T09:23:36Z remove some answered TODOs commit 4346c4d36f242e2480e4a808bed0ef19df6a2335 Author: Damian Guy Date: 2017-04-11T15:46:37Z update to WriteTxnMarkersRequest/Response from Trunk commit 46880d78eae7d2e7853c404bd1d9b19b8ec4e569 Author: Damian Guy Date: 2017-04-11T16:19:01Z add missing @Test annotation commit cbcd55e0d046d8c6d88ddfa5bbdfbc230b171e13 Author: Damian
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967368#comment-15967368 ] Ismael Juma commented on KAFKA-5062: [~rsivaram], yes, it was on PLAINTEXT. The weird thing is that `NetworkReceive` checks that the request size is within socket.request.max.bytes before allocating the buffer. I don't know if [~apurva] has the data, so I'll leave it to him to answer that. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: Invoking KafkaConsumer#seek for the same partition
Sounds to me the comment is imprecisely phrased but was meant to indicate the behaviour you are describing. Perhaps instead of "the latest offset", it should say, "the offset used in the latest seek" to make it super-clear. Cheers, Michal On 13/04/17 08:28, Hu Xi wrote: Hi guys, The comments for KafkaConsumer#seek says “If this API is invoked for the same partition more than once, the latest offset will be used on the next poll()”. However, I tried a couple of times, and it turned out that the next poll could always read records from the offset which was specified in the last call of KafkaConsumer#seek instead of the latest offset. Seems the comment is not correct. What do you say? Any comments are welcomed.
答复: Invoking KafkaConsumer#seek for the same partition
Oh My! yes, you are right. I would have been thinking it that way Thank you. 发件人: Michal Borowiecki发送时间: 2017年4月13日 17:02 收件人: dev@kafka.apache.org 主题: Re: Invoking KafkaConsumer#seek for the same partition Sounds to me the comment is imprecisely phrased but was meant to indicate the behaviour you are describing. Perhaps instead of "the latest offset", it should say, "the offset used in the latest seek" to make it super-clear. Cheers, Michal On 13/04/17 08:28, Hu Xi wrote: > Hi guys, > > > The comments for KafkaConsumer#seek says “If this API is invoked for the same > partition more than once, the latest offset will be used on the next poll()”. > However, I tried a couple of times, and it turned out that the next poll > could always read records from the offset which was specified in the last > call of KafkaConsumer#seek instead of the latest offset. Seems the comment is > not correct. What do you say? Any comments are welcomed.
[jira] [Created] (KAFKA-5064) Kafka service crashes with SIGSEGV
Jothikanth created KAFKA-5064: - Summary: Kafka service crashes with SIGSEGV Key: KAFKA-5064 URL: https://issues.apache.org/jira/browse/KAFKA-5064 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.9.0.1 Environment: Production Reporter: Jothikanth Attachments: hs_error.log Hi, I am seeing kafka crashes with SIGSEGV at times. This happens only on one node in a 6 node cluster. I have attached the hs_error.log which was generated during a SIGSEGV. Please let me know if you require more details. Thanks, -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: 答复: Invoking KafkaConsumer#seek for the same partition
But I totally agree, the comment is ambiguous. The way it's phrased now "latest offset" can easily be taken for "the highest(=latest) of the offsets" rather than "the offset last-used". Cheers, Michal On 13/04/17 10:07, Hu Xi wrote: Oh My! yes, you are right. I would have been thinking it that way Thank you. 发件人: Michal Borowiecki发送时间: 2017年4月13日 17:02 收件人: dev@kafka.apache.org 主题: Re: Invoking KafkaConsumer#seek for the same partition Sounds to me the comment is imprecisely phrased but was meant to indicate the behaviour you are describing. Perhaps instead of "the latest offset", it should say, "the offset used in the latest seek" to make it super-clear. Cheers, Michal On 13/04/17 08:28, Hu Xi wrote: Hi guys, The comments for KafkaConsumer#seek says “If this API is invoked for the same partition more than once, the latest offset will be used on the next poll()”. However, I tried a couple of times, and it turned out that the next poll could always read records from the offset which was specified in the last call of KafkaConsumer#seek instead of the latest offset. Seems the comment is not correct. What do you say? Any comments are welcomed.
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967357#comment-15967357 ] Rajini Sivaram commented on KAFKA-5062: --- [~apurva] Is this on a PLAINTEXT port? Also, do you have the data from the first 360 bytes? > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Invoking KafkaConsumer#seek for the same partition
Hi guys, The comments for KafkaConsumer#seek says “If this API is invoked for the same partition more than once, the latest offset will be used on the next poll()”. However, I tried a couple of times, and it turned out that the next poll could always read records from the offset which was specified in the last call of KafkaConsumer#seek instead of the latest offset. Seems the comment is not correct. What do you say? Any comments are welcomed.
[jira] [Commented] (KAFKA-5059) Implement Transactional Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967233#comment-15967233 ] ASF GitHub Bot commented on KAFKA-5059: --- GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/2849 KAFKA-5059: Implement Transactional Coordinator You can merge this pull request into a Git repository by running: $ git pull https://github.com/confluentinc/kafka exactly-once-tc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2849.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2849 commit 4d17b7c96293ca8f9735049070512be9707aba27 Author: Guozhang WangDate: 2017-03-02T01:42:49Z Transaction log message format (#134) * add transaction log message format * add transaction timeout to initPid request * collapse to one message type commit af926510d2fd455a0ea4e82da83e10cde65db4e9 Author: Apurva Mehta Date: 2017-03-15T20:47:25Z Fix build and test errors due to reabse onto idempotent-producer branch commit fc3544bf6b55c48d487ef2b7877280d3ac90debb Author: Guozhang Wang Date: 2017-03-17T05:40:49Z Transaction log partition Immigration and Emigration (#142) * sub-package transaction and group classes within coordinator * add loading and cleaning up logic * add transaction configs commit fc5fe9226dd4374018f6b5fe3c182158530af193 Author: Guozhang Wang Date: 2017-03-21T04:38:35Z Add transactions broker configs (#146) * add all broker-side configs * check for transaction timeout value * added one more exception type commit ef390df0eacc8d1f32f96b2db792326a053a5db1 Author: Guozhang Wang Date: 2017-03-31T22:20:05Z Handle addPartitions and addOffsets on TC (#147) * handling add offsets to txn * add a pending state with prepareTransition / completeTransaction / abortTransition of state * refactor handling logic for multiple in-flight requests commit 2a6526a861546eb4102b900d1da703fd2914bd43 Author: Apurva Mehta Date: 2017-04-07T19:49:19Z Fix build errors after rebase onto trunk and dropping out the request stubs and client changes. commit 4d18bb178cd48364bf610e615b176ad8f0d8385f Author: Apurva Mehta Date: 2017-04-03T21:17:25Z Fix test errors after rebase: 1. Notable conflicts are with the small API changes to DelayedOperation and the newly introduced purgeDataBefore PR. 2. Jason's update to support streaming decompression required a bit of an overhaul to the way we handle aborted transactions on the consumer. commit f639b962e8ba618baaef47611e21e2b85b5e5725 Author: Guozhang Wang Date: 2017-03-24T22:42:53Z fix unit tests commit 853c5e8abffdb723c6f6b818fdeeab94da8667ed Author: Guozhang Wang Date: 2017-03-24T22:52:37Z add sender thread commit 879c01c3b5b305485cfd26cb8ceedf453b984067 Author: Guozhang Wang Date: 2017-03-28T01:04:53Z rename TC Send Thread to general inter-broker send thread commit 239e7f733f8b814ca2d966a80359d8d0de5dee50 Author: Guozhang Wang Date: 2017-03-29T21:58:45Z add tc channel manager commit b1561da6e2893fad7bcfacba76db4e4df6414577 Author: Guozhang Wang Date: 2017-03-29T21:59:26Z missing files commit 62685c7269fc648a2401fc7a71f31b9536d7c08a Author: Guozhang Wang Date: 2017-03-31T22:15:37Z add the txn marker channel manager commit 298790154c9bfe46f8e4a6b2e0372297fb19896a Author: Damian Guy Date: 2017-04-05T16:09:27Z fix compilation errors commit 4f5c23d051453d27f3179a442fe3d822b77d4e12 Author: Damian Guy Date: 2017-04-10T10:58:43Z integrate EndTxnRequest commit e5f25f31e85fd8104c3df8f8195ccb60694610bc Author: Damian Guy Date: 2017-04-10T13:43:40Z add test fo InterBrokerSendThread. Refactor to use delegation rather than inheritance commit 8bbd7a07be28585cd329a1fc769fcc340f866af2 Author: Damian Guy Date: 2017-04-10T16:24:24Z refactor TransactionMarkerChannelManager. Add some test commit 195bccf8c3945696e6e15cc093072ba83e706eec Author: Damian Guy Date: 2017-04-10T18:25:57Z more tests commit c28eb5a0b339cce023e278d7eafcf3e8a98fa8e2 Author: Damian Guy Date: 2017-04-11T09:23:36Z remove some answered TODOs commit 4346c4d36f242e2480e4a808bed0ef19df6a2335 Author: Damian Guy Date: 2017-04-11T15:46:37Z update to
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968257#comment-15968257 ] James Cheng commented on KAFKA-5062: I agree with Jun about how to try to reproduce it. According to http://kafka.apache.org/protocol.html#protocol_common, a RequestOrResponse is Size (int32) followed by the rest of the bytes of the Request|Response. If you take a valid request, and just change the first 4 bytes to something huge, and send it in, what would happen in that case? That's the scenario that I mentioned in the link in my first comment. Some system that story parsed the first 4 bytes of H T T P, which turns into decimal value of 1213486160, which caused some application to attempt to allocate 1.2GB of memory for a request. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968268#comment-15968268 ] James Cheng commented on KAFKA-5062: Ah, gotcha. I missed that part. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: Kafka-Streams: Cogroup
Hi Kyle, (cc-ing user list as well) This could be an interesting scenario. Two things to help us think through it some more: 1) it seems you attached a figure, but I cannot seem to open it. 2) what about using the low level processor API instead of the DSL as approach 3? Do you have any thoughts on that? Thanks Eno > On 13 Apr 2017, at 11:26, Winkelman, Kyle Gwrote: > > Hello, > > I am wondering if there is any way to aggregate together many streams at once > to build a larger object. Example (Healthcare Domain): > I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value > is a different Avro Record for each stream. > I was hoping there was a way to supply a single Initializer, () -> new > Patient(), and 3 aggregators, (key, value, patient) -> > patient.add**Claim(value). > > Currently the only way that I see to do the above use case is by aggregating > each individual stream then joining them. This doesn’t scale well with a > large number of input streams because for each stream I would be creating > another state store. > > I was hoping to get thoughts on a KCogroupedStream api. I have spent a little > time conceptualizing it. > > Approach 1: > In KGroupedStream add a cogroup method that takes the single initializer, a > list of other kgroupedstreams, and a list of other aggregators. > This would then all flow through a single processor and a have a single > backing state store. > The aggregator that the object will get sent to is determined by the > context().topic() which we should be able to trace back to one of the > kgroupedstreams in the list. > > The problem I am having with this approach is that because everything is > going through the single processors and java doesn’t do the best with generic > types. I have to either pass in a list of Type objects for casting the object > before sending it to the aggregator or I must create aggregators that accept > an object and cast them to the appropriate type. > > Approach 2: > Create one processor for each aggregator and have a single state store. Then > have a single KStreamPassThrough that just passes on the new aggregate value. > The positive for this is you know which stream it will be coming from and > won’t need to do the context().topic() trick. > > The problem I am having with this approach is understanding if there is a > race condition. Obviously the source topics would be copartitioned. But would > it be multithreaded and possibly cause one of the processors to grab patient > 1 at the same time a different processor has grabbed patient 1? > My understanding is that for each partition there would be a single complete > set of processors and a new incoming record would go completely through the > processor topology from a source node to a sink node before the next one is > sent through. Is this correct? > > > > If anyone has any additional ideas about this let me know. I don’t know if I > have the time to actually create this api so if someone likes the idea and > wants to develop it feel free. > > This e-mail, including attachments, may include confidential and/or > proprietary information, and may be used only by the person or entity > to which it is addressed. If the reader of this e-mail is not the intended > recipient or his or her authorized agent, the reader is hereby notified > that any dissemination, distribution or copying of this e-mail is > prohibited. If you have received this e-mail in error, please notify the > sender by replying to this message and delete this e-mail immediately. >
Re: [DISCUSS] ACL operations
Based on the initial discussion here, and the draft KIP-133, it sounds like the plan is to have AdminClient APIs like: addAcls, removeAcls, listAcls, listConfig, changeConfig (roughly speaking). However, just to play devil's advocate here a bit, wouldn't AdminClient users find it more natural to view all of those things as topic modifications or descriptions? For example, why can't I find the configuration or ACLs applied to topics when calling describeTopics? Why can't I have an alterTopics API that can alter both ACLs and configuration? And if we decide to have APIs like that, shouldn't we have AlterTopicsRequest and DescribeTopicsRequest instead of ListAclsRequest, ListConfigurationRequest, AlterAclsRequest, AlterConfigurationRequest? I'm curious which approach seems better. cheers, Colin On Thu, Apr 13, 2017, at 14:38, Ismael Juma wrote: > Hi Colin, > > Thanks for coordinating with Grant and reviving this. I agree that having > a > separate delete request makes sense. This also came up in the original > discussion thread and I think people were in favour. > > Ismael > > On 13 Apr 2017 10:21 pm, "Colin McCabe"wrote: > > > Hi all, > > > > KIP-4 described some RPCs for implementing centralized administrative > > operations for Kafka. Now that the adminclient work is going forward, > > I'd like to re-open the discussion about the ACL-related RPCs. This is > > a continuation of the email thread Grant Henke started while back. (See > > http://search-hadoop.com/m/Kafka/uyzND18EGG22cFMXg?subj=+ > > DISCUSS+KIP+4+ACL+Admin+Schema > > ) > > > > I think the idea of sending a batch of ACL-related operations all at > > once is good for efficiency. However, I wonder if it is simpler to > > separate the add and remove ACLs operations, or if we really ought to > > combine them into one RP It seems that when both add and remove > > operations are combined into one RPC, there are some thorny questions > > about ordering (does a delete ACL operation on a topic happen first, or > > an add ACL operation?) > > > > best, > > Colin > >
[jira] [Created] (KAFKA-5069) add controller integration tests
Onur Karaman created KAFKA-5069: --- Summary: add controller integration tests Key: KAFKA-5069 URL: https://issues.apache.org/jira/browse/KAFKA-5069 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Test the various controller protocols by observing zookeeper and broker state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] kafka pull request #2851: MINOR: Clarify wording
GitHub user jeffwidman opened a pull request: https://github.com/apache/kafka/pull/2851 MINOR: Clarify wording You can merge this pull request into a Git repository by running: $ git pull https://github.com/jeffwidman/kafka patch-3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2851.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2851 commit a8629762e0eff7c7feab4ef7e9072a80d959033f Author: Jeff WidmanDate: 2017-04-13T21:36:37Z MINOR: Clarify wording --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] kafka pull request #2852: MINOR: Fix some re-raising of exceptions in system...
GitHub user ewencp opened a pull request: https://github.com/apache/kafka/pull/2852 MINOR: Fix some re-raising of exceptions in system tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ewencp/kafka minor-re-raise-exceptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2852.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2852 commit c56377ca83f36c6ffa0ab718833918ca82bd7211 Author: Ewen Cheslack-PostavaDate: 2017-04-13T22:49:12Z MINOR: Fix some re-raising of exceptions in system tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] ACL operations
Hi Colin, Thanks for coordinating with Grant and reviving this. I agree that having a separate delete request makes sense. This also came up in the original discussion thread and I think people were in favour. Ismael On 13 Apr 2017 10:21 pm, "Colin McCabe"wrote: > Hi all, > > KIP-4 described some RPCs for implementing centralized administrative > operations for Kafka. Now that the adminclient work is going forward, > I'd like to re-open the discussion about the ACL-related RPCs. This is > a continuation of the email thread Grant Henke started while back. (See > http://search-hadoop.com/m/Kafka/uyzND18EGG22cFMXg?subj=+ > DISCUSS+KIP+4+ACL+Admin+Schema > ) > > I think the idea of sending a batch of ACL-related operations all at > once is good for efficiency. However, I wonder if it is simpler to > separate the add and remove ACLs operations, or if we really ought to > combine them into one RP It seems that when both add and remove > operations are combined into one RPC, there are some thorny questions > about ordering (does a delete ACL operation on a topic happen first, or > an add ACL operation?) > > best, > Colin >
[jira] [Comment Edited] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968265#comment-15968265 ] Ismael Juma edited comment on KAFKA-5062 at 4/13/17 9:48 PM: - James, as stated previously, we check if the size is higher than the config value (100 MB by default), and disconnect if it is. was (Author: ijuma): James, as stated previously, we check if the size is above the config value (100 MB by default), and disconnect if it is. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968265#comment-15968265 ] Ismael Juma commented on KAFKA-5062: James, as stated previously, we check if the size is above the config value (100 MB by default), and disconnect if it is. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] kafka pull request #2853: KAFKA-5069: add controller integration tests
GitHub user onurkaraman opened a pull request: https://github.com/apache/kafka/pull/2853 KAFKA-5069: add controller integration tests Test the various controller protocols by observing zookeeper and broker state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/onurkaraman/kafka KAFKA-5069 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2853 commit 55544d2375fa267762bc5ecc233f7a296202922d Author: Onur KaramanDate: 2017-04-14T01:54:43Z KAFKA-5069: add controller integration tests Test the various controller protocols by observing zookeeper and broker state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Kafka-Streams: Cogroup
Hello, I am wondering if there is any way to aggregate together many streams at once to build a larger object. Example (Healthcare Domain): I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value is a different Avro Record for each stream. I was hoping there was a way to supply a single Initializer, () -> new Patient(), and 3 aggregators, (key, value, patient) -> patient.add**Claim(value). Currently the only way that I see to do the above use case is by aggregating each individual stream then joining them. This doesn't scale well with a large number of input streams because for each stream I would be creating another state store. I was hoping to get thoughts on a KCogroupedStream api. I have spent a little time conceptualizing it. Approach 1: In KGroupedStream add a cogroup method that takes the single initializer, a list of other kgroupedstreams, and a list of other aggregators. This would then all flow through a single processor and a have a single backing state store. The aggregator that the object will get sent to is determined by the context().topic() which we should be able to trace back to one of the kgroupedstreams in the list. The problem I am having with this approach is that because everything is going through the single processors and java doesn't do the best with generic types. I have to either pass in a list of Type objects for casting the object before sending it to the aggregator or I must create aggregators that accept an object and cast them to the appropriate type. Approach 2: Create one processor for each aggregator and have a single state store. Then have a single KStreamPassThrough that just passes on the new aggregate value. The positive for this is you know which stream it will be coming from and won't need to do the context().topic() trick. The problem I am having with this approach is understanding if there is a race condition. Obviously the source topics would be copartitioned. But would it be multithreaded and possibly cause one of the processors to grab patient 1 at the same time a different processor has grabbed patient 1? My understanding is that for each partition there would be a single complete set of processors and a new incoming record would go completely through the processor topology from a source node to a sink node before the next one is sent through. Is this correct? [cid:image002.png@01D2B45F.53169F50] If anyone has any additional ideas about this let me know. I don't know if I have the time to actually create this api so if someone likes the idea and wants to develop it feel free. This e-mail, including attachments, may include confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. If the reader of this e-mail is not the intended recipient or his or her authorized agent, the reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately.
[jira] [Commented] (KAFKA-5028) convert kafka controller to a single-threaded event queue model
[ https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967653#comment-15967653 ] Balint Molnar commented on KAFKA-5028: -- [~onurkaraman] I am so exited about this. Is there anything I can help? :) > convert kafka controller to a single-threaded event queue model > --- > > Key: KAFKA-5028 > URL: https://issues.apache.org/jira/browse/KAFKA-5028 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > The goal of this ticket is to improve controller maintainability by > simplifying the controller's concurrency semantics. The controller code has a > lot of shared state between several threads using several concurrency > primitives. This makes the code hard to reason about. > This ticket proposes we convert the controller to a single-threaded event > queue model. We add a new controller thread which processes events held in an > event queue. Note that this does not mean we get rid of all threads used by > the controller. We merely delegate all work that interacts with controller > local state to this single thread. With only a single thread accessing and > modifying the controller local state, we no longer need to worry about > concurrent access, which means we can get rid of the various concurrency > primitives used throughout the controller. > Performance is expected to match existing behavior since the bulk of the > existing controller work today already happens sequentially in the ZkClient’s > single ZkEventThread. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5066) KafkaMetricsConfig properties and description notably missing from documentation
Ryan P created KAFKA-5066: - Summary: KafkaMetricsConfig properties and description notably missing from documentation Key: KAFKA-5066 URL: https://issues.apache.org/jira/browse/KAFKA-5066 Project: Kafka Issue Type: Bug Components: documentation Reporter: Ryan P `KafkaMetrics` implementations do not appear to be exposed to all the Yammer metrics exposed to implementations of the `KafkaMetricsReporter` Currently the docs only cover the `metric.reporters` which allows clients to configure a `MetricsReporter` plugin. Clients are then disappointed to learn that this affords them access to only a small subset of metrics. Proper monitoring of the broker requires access to the Yammer metrics which clients can gain access to with a `KafkaMetricsReporter` plugin. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] KIP-138: Change punctuate semantics
Hi, I would like to push this discussion further. Thanks Michael for your email (it was longer than expected, even after your "sorry for the long email" warning... :P). But I'll go ahead an follow your example: it's a long read. :) I think it's really important to put all this in a larger scope and think Michael's comments are super useful here. Kafka Streams DSL does not have triggers (for a good reason, IMHO) and there are no plans to add them at the moment. IMHO, the whole Dataflow/Beam style processing approach, mingles together two different things into one big and hard to use API. A clean DSL, should describe what the result should be but not more -- everything else, "pollutes" the code. I understand the desire to have fine grained control of all kind of things -- but I also believe that fine grained control belongs to Processor API -- not to the DSL (at least not at this level). Therefore, from my point of view, whatever punctuation mechanism we introduce, it should not be part of the DSL (note, you can of course mix-and-match high-level DSL and low-level PAPI). If there is need, to give better control at the DSL level, we can of course design something -- but it should not be punctuations or changes to the DSL itself. To me, this is more at the "configuration level". I want to pick up the example of KIP-63: it added a record cache to reduce the downstream load. This is a pure "performance optimization". Note, that the output of an aggregation is a changelog stream -- thus, "deduplication" does not change the result. It only holds back certain updates, and if new updates are coming in while holding back, you don't see some "intermediate results" -- but you don't need to see those, as they are not "correct" anyway. I point this out to it contrast it to triggers, what have a completely different purpose. Furthermore, I personally believe, that __change__ is the next step in the stream processing world -- and Kafka Stream embraces change ("Yes, we can" -- couldn't resist...). Thus, (IMHO) application should be designed with change in mind -- in the Stream processing world, there is nothing like a final result -- that is the "lie" the Dataflow/Beam model tells you. Of course, if you need to make a decision in real life based in some data, you very often cannot undo a decision and also cannot wait forever to decide. But you need to do this decision based on the "current data" (I don't call is "final result" on purpose as there is no such thing -- or to put it differently: it might exist, but you can never now if it is final, as there might always be another late arriving record). With this regard, wall-clock time punctuation are very useful (and also IQ, that allows you to lock up the __current__ state). Having said this, it's of course impossible to preserve the whole history -- and also not required. As some point, late arriving data are not interesting anymore -- maybe somebody made a decision and the world move on (late data might only show you that the decision was wrong, but it would be too late to correct it). For this reason, Streams has the notion of "retention time". With this regard, you can argue that you get a "final result" after retention time passed. But again, it not part of the "logical description" of the result that is specified by the DSL -- it's an operation concern concern. I strongly believe, that this overall design of the Streams DSL is important to put into account. I know, this is more a "meta" email, and does not give a detailed answer about the punctuation discussion (or not much -- maybe: "not part of DSL"). But it should help to put the use cases we collect into the right buckets, and maybe also help to identify what we need to improve on the DSL to improve its usability, as a fall back to PAPI is always cumbersome. Looking forward to your feedback. -Matthias On 4/11/17 12:52 PM, Thomas Becker wrote: > Here's an example that we currently have. We have a streams processor > that does a transform from one topic into another. One of the fields in > the source topic record is an expiration time, and one of the functions > of the processor is to ensure that expired records get deleted promptly > after that time passes (typically days or weeks after the message was > originally produced). To do that, the processor keeps a state store of > keys and expiration times, iterates that store in punctuate(), and > emits delete (null) records for expired items. This needs to happen at > some minimum interval regardless of the incoming message rate of the > source topic. > > In this scenario, the expiration of records is the primary function of > punctuate, and therefore the key requirement is that the wall-clock > measured time between punctuate calls have some upper-bound. So a pure > wall-clock based schedule would be fine for our needs. But the proposed > "hybrid" system would also be acceptable if that satisfies a broader > range of use-cases. > > On Tue,
[jira] [Commented] (KAFKA-5049) Chroot check should be done for each ZkUtils instance
[ https://issues.apache.org/jira/browse/KAFKA-5049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968624#comment-15968624 ] Umesh Chaudhary commented on KAFKA-5049: Thanks [~junrao] for the pointers. While I was looking at the current implementation, it seems difficult to instantiate ZkPath trivially. Shouldn't we need another class definition (as a companion of existing ZkPath object) which enables the instantiation of ZkPath? Please correct me if I am wrong. > Chroot check should be done for each ZkUtils instance > - > > Key: KAFKA-5049 > URL: https://issues.apache.org/jira/browse/KAFKA-5049 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma > Labels: newbie > Fix For: 0.11.0.0 > > > In KAFKA-1994, the check for ZK chroot was moved to ZkPath. However, ZkPath > is a JVM singleton and we may use multiple ZkClient instances with multiple > ZooKeeper ensembles in the same JVM (for cluster info, authorizer and > pluggable code provided by users). > The right way to do this is to make ZkPath an instance variable in ZkUtils so > that we do the check once per ZkUtils instance. > cc [~gwenshap] [~junrao], who reviewed KAFKA-1994, in case I am missing > something. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API
Florian, >>> What about KafkaStreams#toString() method? >>> >>> I think, we want to deprecate it as with KIP-120 and the changes of this >>> KIP, is gets obsolete. Any thoughts about this? For me, this is the last open point to discuss (or what should be reflected in the KIP in case you agree) before I can put my vote on the VOTE thread do did start already. -Matthias On 4/11/17 12:18 AM, Damian Guy wrote: > Hi Florian, > > Thanks for the updates. The KIP is looking good. > > Cheers, > Damian > > On Fri, 7 Apr 2017 at 22:41 Matthias J. Saxwrote: > >> What about KafkaStreams#toString() method? >> >> I think, we want to deprecate it as with KIP-120 and the changes of this >> KIP, is gets obsolete. >> >> If we do so, please update the KIP accordingly. >> >> >> -Matthias >> >> On 3/28/17 7:00 PM, Matthias J. Sax wrote: >>> Thanks for updating the KIP! >>> >>> I think it's good as is -- I would not add anything more to TaskMetadata. >>> >>> About subtopologies and tasks. We do have the concept of subtopologies >>> already in KIP-120. It's only missing and ID that allow to link a >>> subtopology to a task. >>> >>> IMHO, adding a simple variable to `Subtopoloy` that provide the id >>> should be sufficient. We can simply document in the JavaDocs how >>> Subtopology and TaskMetadata can be linked to each other. >>> >>> I did update KIP-120 accordingly. >>> >>> >>> -Matthias >>> >>> On 3/28/17 3:45 PM, Florian Hussonnois wrote: Hi all, I've updated the KIP and the PR to reflect your suggestions. >> https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API https://github.com/apache/kafka/pull/2612 Also, I've exposed property StreamThread#state as a string through the new class ThreadMetadata. Thanks, 2017-03-27 23:40 GMT+02:00 Florian Hussonnois >: Hi Guozhang, Matthias, It's a great idea to add sub topologies descriptions. This would help developers to better understand topology concept. I agree that is not really user-friendly to check if `StreamsMetadata#streamThreads` is not returning null. The method name localThreadsMetadata looks good. In addition, it's more simple to build ThreadMetadata instances from the `StreamTask` class than from `StreamPartitionAssignor` class. I will work on modifications. As I understand, I have to add the property subTopologyId property to the TaskMetadata class - Am I >> right ? Thanks, 2017-03-26 0:25 GMT+01:00 Guozhang Wang >: Re 1): this is a good point. May be we can move `StreamsMetadata#streamThreads` as `KafkaStreams#localThreadsMetadata`? 3): this is a minor suggestion about function name of `assignedPartitions`, to `topicPartitions` to be consistent with `StreamsMetadata`? Guozhang On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax > wrote: Thanks for the progress on this KIP. I think we are on the right path! Couple of comments/questions: (1) Why do we not consider the "rejected alternative" to add the method to KafkaStreams? The comment on #streamThreads() says: "Note this method will return null if called on {@link StreamsMetadata} which represent a remote application." Thus, if we cannot get any remote metadata, it seems not straight forward to not add it to KafkaStreams directly -- this would avoid invalid calls and `null` return value in the first place. I like the idea about exposing sub-topologies.: (2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :) (2b) We could add this to KIP-120 already. However, I would not just link both via name, but leverage KIP-120 directly, and add a "Subtopology" member to the TaskMetadata class. Overall, I like the distinction of KIP-120 only exposing "static" information that can be determined before the topology get's started, while this KIP allow to access runtime information. -Matthias On 3/22/17 12:42 PM, Guozhang Wang wrote: > Thanks for the updated KIP, and
[jira] [Commented] (KAFKA-5069) add controller integration tests
[ https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968497#comment-15968497 ] ASF GitHub Bot commented on KAFKA-5069: --- GitHub user onurkaraman opened a pull request: https://github.com/apache/kafka/pull/2853 KAFKA-5069: add controller integration tests Test the various controller protocols by observing zookeeper and broker state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/onurkaraman/kafka KAFKA-5069 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2853 commit 55544d2375fa267762bc5ecc233f7a296202922d Author: Onur KaramanDate: 2017-04-14T01:54:43Z KAFKA-5069: add controller integration tests Test the various controller protocols by observing zookeeper and broker state. > add controller integration tests > > > Key: KAFKA-5069 > URL: https://issues.apache.org/jira/browse/KAFKA-5069 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Test the various controller protocols by observing zookeeper and broker state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5069) add controller integration tests
[ https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5069 started by Onur Karaman. --- > add controller integration tests > > > Key: KAFKA-5069 > URL: https://issues.apache.org/jira/browse/KAFKA-5069 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Test the various controller protocols by observing zookeeper and broker state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5005) JoinIntegrationTest.testLeftKStreamKStream() fails occasionally
[ https://issues.apache.org/jira/browse/KAFKA-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5005: --- Description: testLeftKStreamKStream: {noformat} java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 records from topic outputTopic while only received 0: [] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247) at org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170) at org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192) at org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250) {noformat} testInnerKStreamKTable: {noformat} java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 records from topic outputTopic while only received 0: [] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:248) at org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:171) at org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:193) at org.apache.kafka.streams.integration.JoinIntegrationTest.testInnerKStreamKTable(JoinIntegrationTest.java:305) {noformat} was: {noformat} java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 records from topic outputTopic while only received 0: [] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247) at org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170) at org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192) at org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250) {noformat} > JoinIntegrationTest.testLeftKStreamKStream() fails occasionally > --- > > Key: KAFKA-5005 > URL: https://issues.apache.org/jira/browse/KAFKA-5005 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Eno Thereska > > testLeftKStreamKStream: > {noformat} > java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 > records from topic outputTopic while only received 0: [] > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250) > {noformat} > testInnerKStreamKTable: > {noformat} > java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 > records from topic outputTopic while only received 0: [] > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:248) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:171) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:193) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.testInnerKStreamKTable(JoinIntegrationTest.java:305) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5005) JoinIntegrationTest fails occasionally
[ https://issues.apache.org/jira/browse/KAFKA-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5005: --- Summary: JoinIntegrationTest fails occasionally (was: JoinIntegrationTest.testLeftKStreamKStream() fails occasionally) > JoinIntegrationTest fails occasionally > -- > > Key: KAFKA-5005 > URL: https://issues.apache.org/jira/browse/KAFKA-5005 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Eno Thereska > > testLeftKStreamKStream: > {noformat} > java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 > records from topic outputTopic while only received 0: [] > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250) > {noformat} > testInnerKStreamKTable: > {noformat} > java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 > records from topic outputTopic while only received 0: [] > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:248) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:171) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:193) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.testInnerKStreamKTable(JoinIntegrationTest.java:305) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5057) "Big Message Log"
[ https://issues.apache.org/jira/browse/KAFKA-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968580#comment-15968580 ] Umesh Chaudhary commented on KAFKA-5057: Understood and yes this is a good idea to capture the frequency of "Big Messages" on broker. That new broker config would set the threshold and the produced messages which exceed that threshold, broker would log their details. Also, I can start preparing KIP for this feature. > "Big Message Log" > - > > Key: KAFKA-5057 > URL: https://issues.apache.org/jira/browse/KAFKA-5057 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira > > Really large requests can cause significant GC pauses which can cause quite a > few other symptoms on a broker. Will be nice to be able to catch them. > Lets add the option to log details (client id, topic, partition) for every > produce request that is larger than a configurable threshold. > /cc [~apurva] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5065) AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers
[ https://issues.apache.org/jira/browse/KAFKA-5065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967548#comment-15967548 ] ASF GitHub Bot commented on KAFKA-5065: --- GitHub user porshkevich opened a pull request: https://github.com/apache/kafka/pull/2850 KAFKA-5065; AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers add a consumer config: "max.block.ms" default to 6 ms; when specified, the ensureCoordinatorReady check default call will be limited by "max.block.ms" You can merge this pull request into a Git repository by running: $ git pull https://github.com/porshkevich/kafka KAFKA-5065 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2850.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2850 commit 99004de30a5400b2d8554b4a4469039498e033d4 Author: Vladimir PorshkevichDate: 2017-04-13T12:41:31Z Add max.block.ms to allow timing out ensureCoordinatorReady check. > AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any > bootstrap servers > --- > > Key: KAFKA-5065 > URL: https://issues.apache.org/jira/browse/KAFKA-5065 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0 >Reporter: Vladimir Porshkevich > Labels: newbie > Original Estimate: 4m > Remaining Estimate: 4m > > If Consumer started with wrong bootstrap servers or absent any valid servers, > and Thread call Consumer.poll(timeout) with any timeout Thread stuck in loop > with debug logs like > {noformat} > org.apache.kafka.common.network.Selector - Connection with /172.31.1.100 > disconnected > java.net.ConnectException: Connection timed out: no further information > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) > at > org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > com.example.SccSpringCloudDemoApplication.main(SccSpringCloudDemoApplication.java:46) > {noformat} > Problem with AbstractCoordinator.ensureCoordinatorReady() method > It uses Long.MAX_VALUE as timeout. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967479#comment-15967479 ] Ismael Juma commented on KAFKA-5062: Kafka version was 0.10.1.1, so I don't think BlockingChannel was used. Yes, it was an application that had been misconfigured to connect to Kafka. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5065) AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers
Vladimir Porshkevich created KAFKA-5065: --- Summary: AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers Key: KAFKA-5065 URL: https://issues.apache.org/jira/browse/KAFKA-5065 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0, 0.10.0.1, 0.10.0.0 Reporter: Vladimir Porshkevich If Consumer started with wrong bootstrap servers or absent any valid servers, and Thread call Consumer.poll(timeout) with any timeout Thread stuck in loop with debug logs like {noformat} org.apache.kafka.common.network.Selector - Connection with /172.31.1.100 disconnected java.net.ConnectException: Connection timed out: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.example.SccSpringCloudDemoApplication.main(SccSpringCloudDemoApplication.java:46) {noformat} Problem with AbstractCoordinator.ensureCoordinatorReady() method It uses Long.MAX_VALUE as timeout. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] kafka pull request #2850: KAFKA-5065; AbstractCoordinator.ensureCoordinatorR...
GitHub user porshkevich opened a pull request: https://github.com/apache/kafka/pull/2850 KAFKA-5065; AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers add a consumer config: "max.block.ms" default to 6 ms; when specified, the ensureCoordinatorReady check default call will be limited by "max.block.ms" You can merge this pull request into a Git repository by running: $ git pull https://github.com/porshkevich/kafka KAFKA-5065 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2850.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2850 commit 99004de30a5400b2d8554b4a4469039498e033d4 Author: Vladimir PorshkevichDate: 2017-04-13T12:41:31Z Add max.block.ms to allow timing out ensureCoordinatorReady check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967450#comment-15967450 ] Rajini Sivaram commented on KAFKA-5062: --- [~ijuma] Was it an application writing bad data - not a test injecting bad data on the network? Controller connections use `BlockingChannel` with unlimited receive size. But as you say, can't see how a badly behaved application could trigger that size allocation. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] kafka pull request #2237: support scala 2.12 build
Github user pjfanning closed the pull request at: https://github.com/apache/kafka/pull/2237 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory
[ https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967450#comment-15967450 ] Rajini Sivaram edited comment on KAFKA-5062 at 4/13/17 11:37 AM: - [~ijuma] Was it an application writing bad data - not a test injecting bad data on the network? Controller connections use `BlockingChannel` with unlimited receive size with older versions of the broker. But as you say, can't see how a badly behaved application could trigger that size allocation. was (Author: rsivaram): [~ijuma] Was it an application writing bad data - not a test injecting bad data on the network? Controller connections use `BlockingChannel` with unlimited receive size. But as you say, can't see how a badly behaved application could trigger that size allocation. > Kafka brokers can accept malformed requests which allocate gigabytes of memory > -- > > Key: KAFKA-5062 > URL: https://issues.apache.org/jira/browse/KAFKA-5062 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > > In some circumstances, it is possible to cause a Kafka broker to allocate > massive amounts of memory by writing malformed bytes to the brokers port. > In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 > gigabytes, the first 360 bytes of which were non kafka requests -- an > application was writing the wrong data to kafka, causing the broker to > interpret the request size as 1.8GB and then allocate that amount. Apart from > the first 360 bytes, the rest of the 1.8GB byte array was null. > We have a socket.request.max.bytes set at 100MB to protect against this kind > of thing, but somehow that limit is not always respected. We need to > investigate why and fix it. > cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.
[ https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967705#comment-15967705 ] Jun Rao commented on KAFKA-2729: Thanks for the additional info. In both [~Ronghua Lin] and [~allenzhuyi]'s case, it seems ZK session expiration had happened. As I mentioned earlier in the jira, there is a known issue reported in KAFKA-3083 that when the controller's ZK session expires and loses its controller-ship, it's possible for this zombie controller to continue updating ZK and/or sending LeaderAndIsrRequests to the brokers for a short period of time. When this happens, the broker may not have the most up-to-date information about leader and isr, which can lead to subsequent ZK failure when isr needs to be updated. It may take some time to have this issue fixed. In the interim, the workaround for this issue is to make sure ZK session expiration never happens. This first thing is to figure out what's causing the ZK session to expire. Two common causes are (1) long broker GC and (2) network glitches. For (1), one needs to tune the GC in the broker properly. For (2), one can look at the reported time that the ZK client can't hear from the ZK server and increase the ZK session expiration time according. > Cached zkVersion not equal to that in zookeeper, broker not recovering. > --- > > Key: KAFKA-2729 > URL: https://issues.apache.org/jira/browse/KAFKA-2729 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Danil Serdyuchenko > > After a small network wobble where zookeeper nodes couldn't reach each other, > we started seeing a large number of undereplicated partitions. The zookeeper > cluster recovered, however we continued to see a large number of > undereplicated partitions. Two brokers in the kafka cluster were showing this > in the logs: > {code} > [2015-10-27 11:36:00,888] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for > partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 > (kafka.cluster.Partition) > [2015-10-27 11:36:00,891] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] > not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) > {code} > For all of the topics on the effected brokers. Both brokers only recovered > after a restart. Our own investigation yielded nothing, I was hoping you > could shed some light on this issue. Possibly if it's related to: > https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using > 0.8.2.1. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.
[ https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967735#comment-15967735 ] Edoardo Comar commented on KAFKA-2729: -- FWIW - we saw the same message {{ Cached zkVersion [66] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) }} when redeploying kafka 0.10.0.1 in a cluster after we had run 0.10.2.0 after having wiped kafka's storage, but having kept zookeeper's version (the one bundled with kafka 0.10.2) and its storage For us eventually the cluster recovered. HTH. > Cached zkVersion not equal to that in zookeeper, broker not recovering. > --- > > Key: KAFKA-2729 > URL: https://issues.apache.org/jira/browse/KAFKA-2729 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Danil Serdyuchenko > > After a small network wobble where zookeeper nodes couldn't reach each other, > we started seeing a large number of undereplicated partitions. The zookeeper > cluster recovered, however we continued to see a large number of > undereplicated partitions. Two brokers in the kafka cluster were showing this > in the logs: > {code} > [2015-10-27 11:36:00,888] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for > partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 > (kafka.cluster.Partition) > [2015-10-27 11:36:00,891] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] > not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) > {code} > For all of the topics on the effected brokers. Both brokers only recovered > after a restart. Our own investigation yielded nothing, I was hoping you > could shed some light on this issue. Possibly if it's related to: > https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using > 0.8.2.1. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5067) java.sql.SQLDataException on TimeStamp column when using AWS Redshift as a JDBC source
Curtis Wilde created KAFKA-5067: --- Summary: java.sql.SQLDataException on TimeStamp column when using AWS Redshift as a JDBC source Key: KAFKA-5067 URL: https://issues.apache.org/jira/browse/KAFKA-5067 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.2.0 Reporter: Curtis Wilde Priority: Minor Kafka Connect throws java.sql.SQLDataException when attempting to use Redshift as a data source. When I run the query "select CURRENT_TIMESTAMP;" in a SQL editor it returns: 2017-04-13 16:11:25.204925+00 Full stack trace: [2017-04-13 09:44:09,910] ERROR Failed to get current time from DB using query select CURRENT_TIMESTAMP; on database PostgreSQL (io.confluent.connect.jdbc.util.JdbcUtils:205) java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to Timestamp. at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown Source) at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source) at io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169) at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55) at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-04-13 09:44:09,912] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='', topicPrefix='', timestampColumn='', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:221) java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to Timestamp. at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown Source) at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source) at io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169) at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55) at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ^C[2017-04-13 09:44:12,236] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5067) java.sql.SQLDataException on TimeStamp column when using AWS Redshift as a JDBC source
[ https://issues.apache.org/jira/browse/KAFKA-5067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Curtis Wilde updated KAFKA-5067: Description: Kafka Connect throws java.sql.SQLDataException when attempting to use Redshift as a data source. When I run the query "select CURRENT_TIMESTAMP;" in a SQL editor it returns: 2017-04-13 16:11:25.204925+00 Full stack trace: [2017-04-13 09:44:09,910] ERROR Failed to get current time from DB using query select CURRENT_TIMESTAMP; on database PostgreSQL (io.confluent.connect.jdbc.util.JdbcUtils:205) java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to Timestamp. at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown Source) at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source) at io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169) at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55) at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-04-13 09:44:09,912] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='', topicPrefix='', timestampColumn='', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:221) java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to Timestamp. at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown Source) at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source) at io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169) at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55) at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-04-13 09:44:12,236] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66) was: Kafka Connect throws java.sql.SQLDataException when attempting to use Redshift as a data source. When I run the query "select CURRENT_TIMESTAMP;" in a SQL editor it returns: 2017-04-13 16:11:25.204925+00 Full stack trace: [2017-04-13 09:44:09,910] ERROR Failed to get current time from DB using query select CURRENT_TIMESTAMP; on database PostgreSQL (io.confluent.connect.jdbc.util.JdbcUtils:205) java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to Timestamp. at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source) at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown Source) at
[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967817#comment-15967817 ] Matthias J. Sax commented on KAFKA-5037: A similar issue got reported here: http://search-hadoop.com/m/Kafka/uyzND14MkzKYvKSh2?subj=Kafka+Streams+Application+does+not+start+after+10+1+to+10+2+update+if+topics+need+to+be+auto+created > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > Fix For: 0.11.0.0 > > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5057) "Big Message Log"
[ https://issues.apache.org/jira/browse/KAFKA-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967819#comment-15967819 ] Gwen Shapira commented on KAFKA-5057: - I thought of a new configuration (which made me realize this will require a KIP). Basically, sometimes users bump up max.request.size, but only expect very few large messages (since large messages have impact on garbage collection, throughput, etc). Such log will let them track the number of large messages, their size and their source so they can see if their expectation is correct and adjust course if it isn't. So I will set max.request.size to 10MB, but the logging threshold to 1MB, because I expect very few messages between 1MB and 10MB. Does that make sense? > "Big Message Log" > - > > Key: KAFKA-5057 > URL: https://issues.apache.org/jira/browse/KAFKA-5057 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira > > Really large requests can cause significant GC pauses which can cause quite a > few other symptoms on a broker. Will be nice to be able to catch them. > Lets add the option to log details (client id, topic, partition) for every > produce request that is larger than a configurable threshold. > /cc [~apurva] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (KAFKA-5001) RocksDBSessionStoreSupplierTest#shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled
[ https://issues.apache.org/jira/browse/KAFKA-5001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-5001. Resolution: Duplicate > RocksDBSessionStoreSupplierTest#shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled > --- > > Key: KAFKA-5001 > URL: https://issues.apache.org/jira/browse/KAFKA-5001 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax > > Test fails with > {noformat} > org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplierTest > > shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled STARTED > pure virtual method called > terminate called without an active exception > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5068) Optionally print out metrics after running the perf tests
Jun Rao created KAFKA-5068: -- Summary: Optionally print out metrics after running the perf tests Key: KAFKA-5068 URL: https://issues.apache.org/jira/browse/KAFKA-5068 Project: Kafka Issue Type: Improvement Components: tools Affects Versions: 0.10.2.0 Reporter: Jun Rao Often, we run ProducerPerformance/ConsumerPerformance tests to investigate performance issues. It's useful for the tool to print out the metrics in the producer/consumer at the end of the tests. We can make this optional to preserve the current behavior by default. -- This message was sent by Atlassian JIRA (v6.3.15#6346)