Re: [VOTE] 0.10.2.1 RC1

2017-04-13 Thread Eno Thereska
+1 (non-binding) 

Built sources, ran all unit and integration tests, checked new documentation, 
esp with an eye on the streams library.

Thanks Gwen
Eno

> On 12 Apr 2017, at 17:25, Gwen Shapira  wrote:
> 
> Hello Kafka users, developers, client-developers, friends, romans,
> citizens, etc,
> 
> This is the second candidate for release of Apache Kafka 0.10.2.1.
> 
> This is a bug fix release and it includes fixes and improvements from 24 JIRAs
> (including a few critical bugs).
> 
> Release notes for the 0.10.2.1 release:
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/RELEASE_NOTES.html
> 
> *** Please download, test and vote by Monday, April 17, 5:30 pm PT
> 
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
> 
> Your help in validating this bugfix release is super valuable, so
> please take the time to test and vote!
> 
> Suggested tests:
> * Grab the source archive and make sure it compiles
> * Grab one of the binary distros and run the quickstarts against them
> * Extract and verify one of the site docs jars
> * Build a sample against jars in the staging repo
> * Validate GPG signatures on at least one file
> * Validate the javadocs look ok
> * The 0.10.2 documentation was updated for this bugfix release
> (especially upgrade, streams and connect portions) - please make sure
> it looks ok: http://kafka.apache.org/documentation.html
> 
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/
> 
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
> 
> * Javadoc:
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/javadoc/
> 
> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=e133f2ca57670e77f8114cc72dbc2f91a48e3a3b
> 
> * Documentation:
> http://kafka.apache.org/0102/documentation.html
> 
> * Protocol:
> http://kafka.apache.org/0102/protocol.html
> 
> /**
> 
> Thanks,
> 
> Gwen Shapira



[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968209#comment-15968209
 ] 

Jun Rao commented on KAFKA-5062:


One far fetched scenario is the following. A bad application sent the broker 
some data that happen to match a produce request. The data records in the 
produce request may appear to be compressed. The broker will try to decompress 
the data records, which could lead to the creation of an arbitrary byte array.

One improvement that we could make is to tighten up the parsing of a request on 
the server side. Currently, Schema.read() could succeed if a struct can be 
completely constructed without using all bytes in the input byte buffer. It's 
probably safer to throw an exception when the struct is constructed but there 
are remaining bytes in the buffer.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] ACL operations

2017-04-13 Thread Colin McCabe
Hi all,

KIP-4 described some RPCs for implementing centralized administrative
operations for Kafka.  Now that the adminclient work is going forward,
I'd like to re-open the discussion about the ACL-related RPCs.  This is
a continuation of the email thread Grant Henke started while back.  (See 
http://search-hadoop.com/m/Kafka/uyzND18EGG22cFMXg?subj=+DISCUSS+KIP+4+ACL+Admin+Schema
)

I think the idea of sending a batch of ACL-related operations all at
once is good for efficiency.  However, I wonder if it is simpler to
separate the add and remove ACLs operations, or if we really ought to
combine them into one RP  It seems that when both add and remove
operations are combined into one RPC, there are some thorny questions
about ordering (does a delete ACL operation on a topic happen first, or
an add ACL operation?)

best,
Colin


[jira] [Commented] (KAFKA-4346) Add foreachValue method to KStream

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968177#comment-15968177
 ] 

ASF GitHub Bot commented on KAFKA-4346:
---

Github user xvrl closed the pull request at:

https://github.com/apache/kafka/pull/2063


> Add foreachValue method to KStream
> --
>
> Key: KAFKA-4346
> URL: https://issues.apache.org/jira/browse/KAFKA-4346
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Minor
>  Labels: needs-kip, newbie
>
> This would be the value-only counterpart to foreach, similar to mapValues.
> Adding this method would enhance readability and allow for Java 8 syntactic 
> sugar using method references without having to wrap existing methods that 
> only operate on the value type.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2063: KAFKA-4346: Add foreachValue method to KStream

2017-04-13 Thread xvrl
Github user xvrl closed the pull request at:

https://github.com/apache/kafka/pull/2063


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2849: KAFKA-5059: Implement Transactional Coordinator

2017-04-13 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2849

KAFKA-5059: Implement Transactional Coordinator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/confluentinc/kafka exactly-once-tc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2849.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2849


commit 4d17b7c96293ca8f9735049070512be9707aba27
Author: Guozhang Wang 
Date:   2017-03-02T01:42:49Z

Transaction log message format (#134)

* add transaction log message format
* add transaction timeout to initPid request
* collapse to one message type

commit af926510d2fd455a0ea4e82da83e10cde65db4e9
Author: Apurva Mehta 
Date:   2017-03-15T20:47:25Z

Fix build and test errors due to reabse onto idempotent-producer branch

commit fc3544bf6b55c48d487ef2b7877280d3ac90debb
Author: Guozhang Wang 
Date:   2017-03-17T05:40:49Z

Transaction log partition Immigration and Emigration (#142)

* sub-package transaction and group classes within coordinator
* add loading and cleaning up logic
* add transaction configs

commit fc5fe9226dd4374018f6b5fe3c182158530af193
Author: Guozhang Wang 
Date:   2017-03-21T04:38:35Z

Add transactions broker configs (#146)

* add all broker-side configs
* check for transaction timeout value
* added one more exception type

commit ef390df0eacc8d1f32f96b2db792326a053a5db1
Author: Guozhang Wang 
Date:   2017-03-31T22:20:05Z

Handle addPartitions and addOffsets on TC (#147)

* handling add offsets to txn
* add a pending state with prepareTransition / completeTransaction / 
abortTransition of state
* refactor handling logic for multiple in-flight requests

commit 2a6526a861546eb4102b900d1da703fd2914bd43
Author: Apurva Mehta 
Date:   2017-04-07T19:49:19Z

Fix build errors after rebase onto trunk and dropping out the request stubs 
and client changes.

commit 4d18bb178cd48364bf610e615b176ad8f0d8385f
Author: Apurva Mehta 
Date:   2017-04-03T21:17:25Z

Fix test errors after rebase:

 1. Notable conflicts are with the small API changes to
DelayedOperation and the newly introduced purgeDataBefore PR.

 2. Jason's update to support streaming decompression required a bit of
an overhaul to the way we handle aborted transactions on the consumer.

commit f639b962e8ba618baaef47611e21e2b85b5e5725
Author: Guozhang Wang 
Date:   2017-03-24T22:42:53Z

fix unit tests

commit 853c5e8abffdb723c6f6b818fdeeab94da8667ed
Author: Guozhang Wang 
Date:   2017-03-24T22:52:37Z

add sender thread

commit 879c01c3b5b305485cfd26cb8ceedf453b984067
Author: Guozhang Wang 
Date:   2017-03-28T01:04:53Z

rename TC Send Thread to general inter-broker send thread

commit 239e7f733f8b814ca2d966a80359d8d0de5dee50
Author: Guozhang Wang 
Date:   2017-03-29T21:58:45Z

add tc channel manager

commit b1561da6e2893fad7bcfacba76db4e4df6414577
Author: Guozhang Wang 
Date:   2017-03-29T21:59:26Z

missing files

commit 62685c7269fc648a2401fc7a71f31b9536d7c08a
Author: Guozhang Wang 
Date:   2017-03-31T22:15:37Z

add the txn marker channel manager

commit 298790154c9bfe46f8e4a6b2e0372297fb19896a
Author: Damian Guy 
Date:   2017-04-05T16:09:27Z

fix compilation errors

commit 4f5c23d051453d27f3179a442fe3d822b77d4e12
Author: Damian Guy 
Date:   2017-04-10T10:58:43Z

integrate EndTxnRequest

commit e5f25f31e85fd8104c3df8f8195ccb60694610bc
Author: Damian Guy 
Date:   2017-04-10T13:43:40Z

add test fo InterBrokerSendThread. Refactor to use delegation rather than 
inheritance

commit 8bbd7a07be28585cd329a1fc769fcc340f866af2
Author: Damian Guy 
Date:   2017-04-10T16:24:24Z

refactor TransactionMarkerChannelManager. Add some test

commit 195bccf8c3945696e6e15cc093072ba83e706eec
Author: Damian Guy 
Date:   2017-04-10T18:25:57Z

more tests

commit c28eb5a0b339cce023e278d7eafcf3e8a98fa8e2
Author: Damian Guy 
Date:   2017-04-11T09:23:36Z

remove some answered TODOs

commit 4346c4d36f242e2480e4a808bed0ef19df6a2335
Author: Damian Guy 
Date:   2017-04-11T15:46:37Z

update to WriteTxnMarkersRequest/Response from Trunk

commit 46880d78eae7d2e7853c404bd1d9b19b8ec4e569
Author: Damian Guy 
Date:   2017-04-11T16:19:01Z

add missing @Test annotation

commit cbcd55e0d046d8c6d88ddfa5bbdfbc230b171e13
Author: Damian 

[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967368#comment-15967368
 ] 

Ismael Juma commented on KAFKA-5062:


[~rsivaram], yes, it was on PLAINTEXT. The weird thing is that `NetworkReceive` 
checks that the request size is within socket.request.max.bytes before 
allocating the buffer. I don't know if [~apurva] has the data, so I'll leave it 
to him to answer that.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Invoking KafkaConsumer#seek for the same partition

2017-04-13 Thread Michal Borowiecki
Sounds to me the comment is imprecisely phrased but was meant to 
indicate the behaviour you are describing.


Perhaps instead of "the latest offset", it should say, "the offset used 
in the latest seek" to make it super-clear.


Cheers,

Michal


On 13/04/17 08:28, Hu Xi wrote:

Hi guys,


The comments for KafkaConsumer#seek says “If this API is invoked for the same 
partition more than once, the latest offset will be used on the next poll()”. 
However, I tried a couple of times, and it turned out that the next poll could 
always read records from the offset which was specified in the last call of 
KafkaConsumer#seek instead of the latest offset. Seems the comment is not 
correct.  What do you say? Any comments are welcomed.





答复: Invoking KafkaConsumer#seek for the same partition

2017-04-13 Thread Hu Xi
Oh My! yes, you are right. I would have been thinking it that way  Thank 
you.


发件人: Michal Borowiecki 
发送时间: 2017年4月13日 17:02
收件人: dev@kafka.apache.org
主题: Re: Invoking KafkaConsumer#seek for the same partition

Sounds to me the comment is imprecisely phrased but was meant to
indicate the behaviour you are describing.

Perhaps instead of "the latest offset", it should say, "the offset used
in the latest seek" to make it super-clear.

Cheers,

Michal


On 13/04/17 08:28, Hu Xi wrote:
> Hi guys,
>
>
> The comments for KafkaConsumer#seek says “If this API is invoked for the same 
> partition more than once, the latest offset will be used on the next poll()”. 
> However, I tried a couple of times, and it turned out that the next poll 
> could always read records from the offset which was specified in the last 
> call of KafkaConsumer#seek instead of the latest offset. Seems the comment is 
> not correct.  What do you say? Any comments are welcomed.




[jira] [Created] (KAFKA-5064) Kafka service crashes with SIGSEGV

2017-04-13 Thread Jothikanth (JIRA)
Jothikanth created KAFKA-5064:
-

 Summary: Kafka service crashes with SIGSEGV
 Key: KAFKA-5064
 URL: https://issues.apache.org/jira/browse/KAFKA-5064
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.9.0.1
 Environment: Production
Reporter: Jothikanth
 Attachments: hs_error.log

Hi,

I am seeing kafka crashes with SIGSEGV at times. This happens only on one node 
in a 6 node cluster. I have attached the hs_error.log which was generated 
during a SIGSEGV. Please let me know if you require more details.

Thanks,



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: 答复: Invoking KafkaConsumer#seek for the same partition

2017-04-13 Thread Michal Borowiecki

But I totally agree, the comment is ambiguous.

The way it's phrased now "latest offset" can easily be taken for "the 
highest(=latest) of the offsets" rather than "the offset last-used".


Cheers,

Michal


On 13/04/17 10:07, Hu Xi wrote:

Oh My! yes, you are right. I would have been thinking it that way  Thank 
you.


发件人: Michal Borowiecki 
发送时间: 2017年4月13日 17:02
收件人: dev@kafka.apache.org
主题: Re: Invoking KafkaConsumer#seek for the same partition

Sounds to me the comment is imprecisely phrased but was meant to
indicate the behaviour you are describing.

Perhaps instead of "the latest offset", it should say, "the offset used
in the latest seek" to make it super-clear.

Cheers,

Michal


On 13/04/17 08:28, Hu Xi wrote:

Hi guys,


The comments for KafkaConsumer#seek says “If this API is invoked for the same 
partition more than once, the latest offset will be used on the next poll()”. 
However, I tried a couple of times, and it turned out that the next poll could 
always read records from the offset which was specified in the last call of 
KafkaConsumer#seek instead of the latest offset. Seems the comment is not 
correct.  What do you say? Any comments are welcomed.







[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967357#comment-15967357
 ] 

Rajini Sivaram commented on KAFKA-5062:
---

[~apurva] Is this on a PLAINTEXT port? Also, do you have the data from the 
first 360 bytes?

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Invoking KafkaConsumer#seek for the same partition

2017-04-13 Thread Hu Xi
Hi guys,


The comments for KafkaConsumer#seek says “If this API is invoked for the same 
partition more than once, the latest offset will be used on the next poll()”. 
However, I tried a couple of times, and it turned out that the next poll could 
always read records from the offset which was specified in the last call of 
KafkaConsumer#seek instead of the latest offset. Seems the comment is not 
correct.  What do you say? Any comments are welcomed.


[jira] [Commented] (KAFKA-5059) Implement Transactional Coordinator

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967233#comment-15967233
 ] 

ASF GitHub Bot commented on KAFKA-5059:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2849

KAFKA-5059: Implement Transactional Coordinator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/confluentinc/kafka exactly-once-tc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2849.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2849


commit 4d17b7c96293ca8f9735049070512be9707aba27
Author: Guozhang Wang 
Date:   2017-03-02T01:42:49Z

Transaction log message format (#134)

* add transaction log message format
* add transaction timeout to initPid request
* collapse to one message type

commit af926510d2fd455a0ea4e82da83e10cde65db4e9
Author: Apurva Mehta 
Date:   2017-03-15T20:47:25Z

Fix build and test errors due to reabse onto idempotent-producer branch

commit fc3544bf6b55c48d487ef2b7877280d3ac90debb
Author: Guozhang Wang 
Date:   2017-03-17T05:40:49Z

Transaction log partition Immigration and Emigration (#142)

* sub-package transaction and group classes within coordinator
* add loading and cleaning up logic
* add transaction configs

commit fc5fe9226dd4374018f6b5fe3c182158530af193
Author: Guozhang Wang 
Date:   2017-03-21T04:38:35Z

Add transactions broker configs (#146)

* add all broker-side configs
* check for transaction timeout value
* added one more exception type

commit ef390df0eacc8d1f32f96b2db792326a053a5db1
Author: Guozhang Wang 
Date:   2017-03-31T22:20:05Z

Handle addPartitions and addOffsets on TC (#147)

* handling add offsets to txn
* add a pending state with prepareTransition / completeTransaction / 
abortTransition of state
* refactor handling logic for multiple in-flight requests

commit 2a6526a861546eb4102b900d1da703fd2914bd43
Author: Apurva Mehta 
Date:   2017-04-07T19:49:19Z

Fix build errors after rebase onto trunk and dropping out the request stubs 
and client changes.

commit 4d18bb178cd48364bf610e615b176ad8f0d8385f
Author: Apurva Mehta 
Date:   2017-04-03T21:17:25Z

Fix test errors after rebase:

 1. Notable conflicts are with the small API changes to
DelayedOperation and the newly introduced purgeDataBefore PR.

 2. Jason's update to support streaming decompression required a bit of
an overhaul to the way we handle aborted transactions on the consumer.

commit f639b962e8ba618baaef47611e21e2b85b5e5725
Author: Guozhang Wang 
Date:   2017-03-24T22:42:53Z

fix unit tests

commit 853c5e8abffdb723c6f6b818fdeeab94da8667ed
Author: Guozhang Wang 
Date:   2017-03-24T22:52:37Z

add sender thread

commit 879c01c3b5b305485cfd26cb8ceedf453b984067
Author: Guozhang Wang 
Date:   2017-03-28T01:04:53Z

rename TC Send Thread to general inter-broker send thread

commit 239e7f733f8b814ca2d966a80359d8d0de5dee50
Author: Guozhang Wang 
Date:   2017-03-29T21:58:45Z

add tc channel manager

commit b1561da6e2893fad7bcfacba76db4e4df6414577
Author: Guozhang Wang 
Date:   2017-03-29T21:59:26Z

missing files

commit 62685c7269fc648a2401fc7a71f31b9536d7c08a
Author: Guozhang Wang 
Date:   2017-03-31T22:15:37Z

add the txn marker channel manager

commit 298790154c9bfe46f8e4a6b2e0372297fb19896a
Author: Damian Guy 
Date:   2017-04-05T16:09:27Z

fix compilation errors

commit 4f5c23d051453d27f3179a442fe3d822b77d4e12
Author: Damian Guy 
Date:   2017-04-10T10:58:43Z

integrate EndTxnRequest

commit e5f25f31e85fd8104c3df8f8195ccb60694610bc
Author: Damian Guy 
Date:   2017-04-10T13:43:40Z

add test fo InterBrokerSendThread. Refactor to use delegation rather than 
inheritance

commit 8bbd7a07be28585cd329a1fc769fcc340f866af2
Author: Damian Guy 
Date:   2017-04-10T16:24:24Z

refactor TransactionMarkerChannelManager. Add some test

commit 195bccf8c3945696e6e15cc093072ba83e706eec
Author: Damian Guy 
Date:   2017-04-10T18:25:57Z

more tests

commit c28eb5a0b339cce023e278d7eafcf3e8a98fa8e2
Author: Damian Guy 
Date:   2017-04-11T09:23:36Z

remove some answered TODOs

commit 4346c4d36f242e2480e4a808bed0ef19df6a2335
Author: Damian Guy 
Date:   2017-04-11T15:46:37Z

update to 

[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968257#comment-15968257
 ] 

James Cheng commented on KAFKA-5062:


I agree with Jun about how to try to reproduce it. According to 
http://kafka.apache.org/protocol.html#protocol_common, a RequestOrResponse is 
Size (int32) followed by the rest of the bytes of the Request|Response.

If you take a valid request, and just change the first 4 bytes to something 
huge, and send it in, what would happen in that case?

That's the scenario that I mentioned in the link in my first comment. Some 
system that story parsed the first 4 bytes of H T T P, which turns into decimal 
value of 1213486160, which caused some application to attempt to allocate 1.2GB 
of memory for a request.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968268#comment-15968268
 ] 

James Cheng commented on KAFKA-5062:


Ah, gotcha. I missed that part.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Kafka-Streams: Cogroup

2017-04-13 Thread Eno Thereska
Hi Kyle, (cc-ing user list as well)

This could be an interesting scenario. Two things to help us think through it 
some more: 1) it seems you attached a figure, but I cannot seem to open it. 2) 
what about using the low level processor API instead of the DSL as approach 3? 
Do you have any thoughts on that?

Thanks
Eno

> On 13 Apr 2017, at 11:26, Winkelman, Kyle G  wrote:
> 
> Hello,
>  
> I am wondering if there is any way to aggregate together many streams at once 
> to build a larger object. Example (Healthcare Domain):
> I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value 
> is a different Avro Record for each stream.
> I was hoping there was a way to supply a single Initializer, () -> new 
> Patient(), and 3 aggregators, (key, value, patient) -> 
> patient.add**Claim(value).
>  
> Currently the only way that I see to do the above use case is by aggregating 
> each individual stream then joining them. This doesn’t scale well with a 
> large number of input streams because for each stream I would be creating 
> another state store.
>  
> I was hoping to get thoughts on a KCogroupedStream api. I have spent a little 
> time conceptualizing it.
>  
> Approach 1:
> In KGroupedStream add a cogroup method that takes the single initializer, a 
> list of other kgroupedstreams, and a list of other aggregators.
> This would then all flow through a single processor and a have a single 
> backing state store.
> The aggregator that the object will get sent to is determined by the 
> context().topic() which we should be able to trace back to one of the 
> kgroupedstreams in the list.
>  
> The problem I am having with this approach is that because everything is 
> going through the single processors and java doesn’t do the best with generic 
> types. I have to either pass in a list of Type objects for casting the object 
> before sending it to the aggregator or I must create aggregators that accept 
> an object and cast them to the appropriate type.
>  
> Approach 2:
> Create one processor for each aggregator and have a single state store. Then 
> have a single KStreamPassThrough that just passes on the new aggregate value.
> The positive for this is you know which stream it will be coming from and 
> won’t need to do the context().topic() trick.
>  
> The problem I am having with this approach is understanding if there is a 
> race condition. Obviously the source topics would be copartitioned. But would 
> it be multithreaded and possibly cause one of the processors to grab patient 
> 1 at the same time a different processor has grabbed patient 1?
> My understanding is that for each partition there would be a single complete 
> set of processors and a new incoming record would go completely through the 
> processor topology from a source node to a sink node before the next one is 
> sent through. Is this correct?
>  
> 
>  
> If anyone has any additional ideas about this let me know. I don’t know if I 
> have the time to actually create this api so if someone likes the idea and 
> wants to develop it feel free.
> 
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
> 



Re: [DISCUSS] ACL operations

2017-04-13 Thread Colin McCabe
Based on the initial discussion here, and the draft KIP-133, it sounds
like the plan is to have AdminClient APIs like: addAcls, removeAcls,
listAcls, listConfig, changeConfig (roughly speaking).

However, just to play devil's advocate here a bit, wouldn't AdminClient
users find it more natural to view all of those things as topic
modifications or descriptions?

For example, why can't I find the configuration or ACLs applied to
topics when calling describeTopics?  Why can't I have an alterTopics API
that can alter both ACLs and configuration?  And if we decide to have
APIs like that, shouldn't we have AlterTopicsRequest and
DescribeTopicsRequest instead of ListAclsRequest,
ListConfigurationRequest, AlterAclsRequest, AlterConfigurationRequest?

I'm curious which approach seems better.

cheers,
Colin


On Thu, Apr 13, 2017, at 14:38, Ismael Juma wrote:
> Hi Colin,
> 
> Thanks for coordinating with Grant and reviving this. I agree that having
> a
> separate delete request makes sense. This also came up in the original
> discussion thread and I think people were in favour.
> 
> Ismael
> 
> On 13 Apr 2017 10:21 pm, "Colin McCabe"  wrote:
> 
> > Hi all,
> >
> > KIP-4 described some RPCs for implementing centralized administrative
> > operations for Kafka.  Now that the adminclient work is going forward,
> > I'd like to re-open the discussion about the ACL-related RPCs.  This is
> > a continuation of the email thread Grant Henke started while back.  (See
> > http://search-hadoop.com/m/Kafka/uyzND18EGG22cFMXg?subj=+
> > DISCUSS+KIP+4+ACL+Admin+Schema
> > )
> >
> > I think the idea of sending a batch of ACL-related operations all at
> > once is good for efficiency.  However, I wonder if it is simpler to
> > separate the add and remove ACLs operations, or if we really ought to
> > combine them into one RP  It seems that when both add and remove
> > operations are combined into one RPC, there are some thorny questions
> > about ordering (does a delete ACL operation on a topic happen first, or
> > an add ACL operation?)
> >
> > best,
> > Colin
> >


[jira] [Created] (KAFKA-5069) add controller integration tests

2017-04-13 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5069:
---

 Summary: add controller integration tests
 Key: KAFKA-5069
 URL: https://issues.apache.org/jira/browse/KAFKA-5069
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Test the various controller protocols by observing zookeeper and broker state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2851: MINOR: Clarify wording

2017-04-13 Thread jeffwidman
GitHub user jeffwidman opened a pull request:

https://github.com/apache/kafka/pull/2851

MINOR: Clarify wording



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeffwidman/kafka patch-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2851.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2851


commit a8629762e0eff7c7feab4ef7e9072a80d959033f
Author: Jeff Widman 
Date:   2017-04-13T21:36:37Z

MINOR: Clarify wording




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2852: MINOR: Fix some re-raising of exceptions in system...

2017-04-13 Thread ewencp
GitHub user ewencp opened a pull request:

https://github.com/apache/kafka/pull/2852

MINOR: Fix some re-raising of exceptions in system tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewencp/kafka minor-re-raise-exceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2852.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2852


commit c56377ca83f36c6ffa0ab718833918ca82bd7211
Author: Ewen Cheslack-Postava 
Date:   2017-04-13T22:49:12Z

MINOR: Fix some re-raising of exceptions in system tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] ACL operations

2017-04-13 Thread Ismael Juma
Hi Colin,

Thanks for coordinating with Grant and reviving this. I agree that having a
separate delete request makes sense. This also came up in the original
discussion thread and I think people were in favour.

Ismael

On 13 Apr 2017 10:21 pm, "Colin McCabe"  wrote:

> Hi all,
>
> KIP-4 described some RPCs for implementing centralized administrative
> operations for Kafka.  Now that the adminclient work is going forward,
> I'd like to re-open the discussion about the ACL-related RPCs.  This is
> a continuation of the email thread Grant Henke started while back.  (See
> http://search-hadoop.com/m/Kafka/uyzND18EGG22cFMXg?subj=+
> DISCUSS+KIP+4+ACL+Admin+Schema
> )
>
> I think the idea of sending a batch of ACL-related operations all at
> once is good for efficiency.  However, I wonder if it is simpler to
> separate the add and remove ACLs operations, or if we really ought to
> combine them into one RP  It seems that when both add and remove
> operations are combined into one RPC, there are some thorny questions
> about ordering (does a delete ACL operation on a topic happen first, or
> an add ACL operation?)
>
> best,
> Colin
>


[jira] [Comment Edited] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968265#comment-15968265
 ] 

Ismael Juma edited comment on KAFKA-5062 at 4/13/17 9:48 PM:
-

James, as stated previously, we check if the size is higher than the config 
value (100 MB by default), and disconnect if it is.


was (Author: ijuma):
James, as stated previously, we check if the size is above the config value 
(100 MB by default), and disconnect if it is.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968265#comment-15968265
 ] 

Ismael Juma commented on KAFKA-5062:


James, as stated previously, we check if the size is above the config value 
(100 MB by default), and disconnect if it is.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2853: KAFKA-5069: add controller integration tests

2017-04-13 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2853

KAFKA-5069: add controller integration tests

Test the various controller protocols by observing zookeeper and broker 
state.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-5069

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2853


commit 55544d2375fa267762bc5ecc233f7a296202922d
Author: Onur Karaman 
Date:   2017-04-14T01:54:43Z

KAFKA-5069: add controller integration tests

Test the various controller protocols by observing zookeeper and broker 
state.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Kafka-Streams: Cogroup

2017-04-13 Thread Winkelman, Kyle G
Hello,

I am wondering if there is any way to aggregate together many streams at once 
to build a larger object. Example (Healthcare Domain):
I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value is 
a different Avro Record for each stream.
I was hoping there was a way to supply a single Initializer, () -> new 
Patient(), and 3 aggregators, (key, value, patient) -> 
patient.add**Claim(value).

Currently the only way that I see to do the above use case is by aggregating 
each individual stream then joining them. This doesn't scale well with a large 
number of input streams because for each stream I would be creating another 
state store.

I was hoping to get thoughts on a KCogroupedStream api. I have spent a little 
time conceptualizing it.

Approach 1:
In KGroupedStream add a cogroup method that takes the single initializer, a 
list of other kgroupedstreams, and a list of other aggregators.
This would then all flow through a single processor and a have a single backing 
state store.
The aggregator that the object will get sent to is determined by the 
context().topic() which we should be able to trace back to one of the 
kgroupedstreams in the list.

The problem I am having with this approach is that because everything is going 
through the single processors and java doesn't do the best with generic types. 
I have to either pass in a list of Type objects for casting the object before 
sending it to the aggregator or I must create aggregators that accept an object 
and cast them to the appropriate type.

Approach 2:
Create one processor for each aggregator and have a single state store. Then 
have a single KStreamPassThrough that just passes on the new aggregate value.
The positive for this is you know which stream it will be coming from and won't 
need to do the context().topic() trick.

The problem I am having with this approach is understanding if there is a race 
condition. Obviously the source topics would be copartitioned. But would it be 
multithreaded and possibly cause one of the processors to grab patient 1 at the 
same time a different processor has grabbed patient 1?
My understanding is that for each partition there would be a single complete 
set of processors and a new incoming record would go completely through the 
processor topology from a source node to a sink node before the next one is 
sent through. Is this correct?

[cid:image002.png@01D2B45F.53169F50]

If anyone has any additional ideas about this let me know. I don't know if I 
have the time to actually create this api so if someone likes the idea and 
wants to develop it feel free.

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


[jira] [Commented] (KAFKA-5028) convert kafka controller to a single-threaded event queue model

2017-04-13 Thread Balint Molnar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967653#comment-15967653
 ] 

Balint Molnar commented on KAFKA-5028:
--

[~onurkaraman] I am so exited about this. Is there anything I can help? :)

> convert kafka controller to a single-threaded event queue model
> ---
>
> Key: KAFKA-5028
> URL: https://issues.apache.org/jira/browse/KAFKA-5028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The goal of this ticket is to improve controller maintainability by 
> simplifying the controller's concurrency semantics. The controller code has a 
> lot of shared state between several threads using several concurrency 
> primitives. This makes the code hard to reason about.
> This ticket proposes we convert the controller to a single-threaded event 
> queue model. We add a new controller thread which processes events held in an 
> event queue. Note that this does not mean we get rid of all threads used by 
> the controller. We merely delegate all work that interacts with controller 
> local state to this single thread. With only a single thread accessing and 
> modifying the controller local state, we no longer need to worry about 
> concurrent access, which means we can get rid of the various concurrency 
> primitives used throughout the controller.
> Performance is expected to match existing behavior since the bulk of the 
> existing controller work today already happens sequentially in the ZkClient’s 
> single ZkEventThread.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5066) KafkaMetricsConfig properties and description notably missing from documentation

2017-04-13 Thread Ryan P (JIRA)
Ryan P created KAFKA-5066:
-

 Summary: KafkaMetricsConfig properties and description notably 
missing from documentation
 Key: KAFKA-5066
 URL: https://issues.apache.org/jira/browse/KAFKA-5066
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Ryan P


`KafkaMetrics` implementations do not appear to be exposed to all the Yammer 
metrics exposed to implementations of the `KafkaMetricsReporter` 

Currently the docs only cover the `metric.reporters` which allows clients to 
configure a `MetricsReporter` plugin. Clients are then disappointed to learn 
that this affords them access to only a small subset of metrics. 

Proper monitoring of the broker requires access to the Yammer metrics which 
clients can gain access to with a `KafkaMetricsReporter` plugin. 





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-13 Thread Matthias J. Sax
Hi,

I would like to push this discussion further. Thanks Michael for your
email (it was longer than expected, even after your "sorry for the long
email" warning... :P). But I'll go ahead an follow your example: it's a
long read. :)

I think it's really important to put all this in a larger scope and
think Michael's comments are super useful here.

Kafka Streams DSL does not have triggers (for a good reason, IMHO) and
there are no plans to add them at the moment. IMHO, the whole
Dataflow/Beam style processing approach, mingles together two different
things into one big and hard to use API. A clean DSL, should describe
what the result should be but not more -- everything else, "pollutes"
the code.

I understand the desire to have fine grained control of all kind of
things -- but I also believe that fine grained control belongs to
Processor API -- not to the DSL (at least not at this level). Therefore,
from my point of view, whatever punctuation mechanism we introduce, it
should not be part of the DSL (note, you can of course mix-and-match
high-level DSL and low-level PAPI).

If there is need, to give better control at the DSL level, we can of
course design something -- but it should not be punctuations or changes
to the DSL itself. To me, this is more at the "configuration level".

I want to pick up the example of KIP-63: it added a record cache to
reduce the downstream load. This is a pure "performance optimization".
Note, that the output of an aggregation is a changelog stream -- thus,
"deduplication" does not change the result. It only holds back certain
updates, and if new updates are coming in while holding back, you don't
see some "intermediate results" -- but you don't need to see those, as
they are not "correct" anyway. I point this out to  it contrast it to
triggers, what have a completely different purpose.

Furthermore, I personally believe, that __change__ is the next step in
the stream processing world -- and Kafka Stream embraces change ("Yes,
we can" -- couldn't resist...). Thus, (IMHO) application should be
designed with change in mind -- in the Stream processing world, there is
nothing like a final result -- that is the "lie" the Dataflow/Beam model
tells you. Of course, if you need to make a decision in real life based
in some data, you very often cannot undo a decision and also cannot wait
forever to decide. But you need to do this decision based on the
"current data" (I don't call is "final result" on purpose as there is no
such thing -- or to put it differently: it might exist, but you can
never now if it is final, as there might always be another late arriving
record). With this regard, wall-clock time punctuation are very useful
(and also IQ, that allows you to lock up the __current__ state).

Having said this, it's of course impossible to preserve the whole
history -- and also not required. As some point, late arriving data are
not interesting anymore -- maybe somebody made a decision and the world
move on (late data might only show you that the decision was wrong, but
it would be too late to correct it). For this reason, Streams has the
notion of "retention time". With this regard, you can argue that you get
a "final result" after retention time passed. But again, it not part of
the "logical description" of the result that is specified by the DSL --
it's an operation concern concern.

I strongly believe, that this overall design of the Streams DSL is
important to put into account. I know, this is more a "meta" email, and
does not give a detailed answer about the punctuation discussion (or not
much -- maybe: "not part of DSL"). But it should help to put the use
cases we collect into the right buckets, and maybe also help to identify
what we need to improve on the DSL to improve its usability, as a fall
back to PAPI is always cumbersome.


Looking forward to your feedback.


-Matthias






On 4/11/17 12:52 PM, Thomas Becker wrote:
> Here's an example that we currently have.  We have a streams processor
> that does a transform from one topic into another. One of the fields in
> the source topic record is an expiration time, and one of the functions
> of the processor is to ensure that expired records get deleted promptly
> after that time passes (typically days or weeks after the message was
> originally produced). To do that, the processor keeps a state store of
> keys and expiration times, iterates that store in punctuate(), and
> emits delete (null) records for expired items. This needs to happen at
> some minimum interval regardless of the incoming message rate of the
> source topic.
> 
> In this scenario, the expiration of records is the primary function of
> punctuate, and therefore the key requirement is that the wall-clock
> measured time between punctuate calls have some upper-bound. So a pure
> wall-clock based schedule would be fine for our needs. But the proposed
> "hybrid" system would also be acceptable if that satisfies a broader
> range of use-cases.
> 
> On Tue, 

[jira] [Commented] (KAFKA-5049) Chroot check should be done for each ZkUtils instance

2017-04-13 Thread Umesh Chaudhary (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968624#comment-15968624
 ] 

Umesh Chaudhary commented on KAFKA-5049:


Thanks [~junrao] for the pointers. While I was looking at the current 
implementation, it seems difficult to instantiate ZkPath trivially. Shouldn't 
we need another class definition (as a companion of existing ZkPath object) 
which enables the instantiation of ZkPath? Please correct me if I am wrong. 

> Chroot check should be done for each ZkUtils instance
> -
>
> Key: KAFKA-5049
> URL: https://issues.apache.org/jira/browse/KAFKA-5049
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>  Labels: newbie
> Fix For: 0.11.0.0
>
>
> In KAFKA-1994, the check for ZK chroot was moved to ZkPath. However, ZkPath 
> is a JVM singleton and we may use multiple ZkClient instances with multiple 
> ZooKeeper ensembles in the same JVM (for cluster info, authorizer and 
> pluggable code provided by users).
> The right way to do this is to make ZkPath an instance variable in ZkUtils so 
> that we do the check once per ZkUtils instance.
> cc [~gwenshap] [~junrao], who reviewed KAFKA-1994, in case I am missing 
> something.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-04-13 Thread Matthias J. Sax
Florian,

>>> What about KafkaStreams#toString() method?
>>>
>>> I think, we want to deprecate it as with KIP-120 and the changes of this
>>> KIP, is gets obsolete.

Any thoughts about this? For me, this is the last open point to discuss
(or what should be reflected in the KIP in case you agree) before I can
put my vote on the VOTE thread do did start already.

-Matthias


On 4/11/17 12:18 AM, Damian Guy wrote:
> Hi Florian,
> 
> Thanks for the updates. The KIP is looking good.
> 
> Cheers,
> Damian
> 
> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax  wrote:
> 
>> What about KafkaStreams#toString() method?
>>
>> I think, we want to deprecate it as with KIP-120 and the changes of this
>> KIP, is gets obsolete.
>>
>> If we do so, please update the KIP accordingly.
>>
>>
>> -Matthias
>>
>> On 3/28/17 7:00 PM, Matthias J. Sax wrote:
>>> Thanks for updating the KIP!
>>>
>>> I think it's good as is -- I would not add anything more to TaskMetadata.
>>>
>>> About subtopologies and tasks. We do have the concept of subtopologies
>>> already in KIP-120. It's only missing and ID that allow to link a
>>> subtopology to a task.
>>>
>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id
>>> should be sufficient. We can simply document in the JavaDocs how
>>> Subtopology and TaskMetadata can be linked to each other.
>>>
>>> I did update KIP-120 accordingly.
>>>
>>>
>>> -Matthias
>>>
>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote:
 Hi all,

 I've updated the KIP and the PR to reflect your suggestions.

>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
 https://github.com/apache/kafka/pull/2612

 Also, I've exposed property StreamThread#state as a string through the
 new class ThreadMetadata.

 Thanks,

 2017-03-27 23:40 GMT+02:00 Florian Hussonnois >:

 Hi Guozhang, Matthias,

 It's a great idea to add sub topologies descriptions. This would
 help developers to better understand topology concept.

 I agree that is not really user-friendly to check if
 `StreamsMetadata#streamThreads` is not returning null.

 The method name localThreadsMetadata looks good. In addition, it's
 more simple to build ThreadMetadata instances from the `StreamTask`
 class than from `StreamPartitionAssignor` class.

 I will work on modifications. As I understand, I have to add the
 property subTopologyId property to the TaskMetadata class - Am I
>> right ?

 Thanks,

 2017-03-26 0:25 GMT+01:00 Guozhang Wang >:

 Re 1): this is a good point. May be we can move
 `StreamsMetadata#streamThreads` as
 `KafkaStreams#localThreadsMetadata`?

 3): this is a minor suggestion about function name of
 `assignedPartitions`, to `topicPartitions` to be consistent with
 `StreamsMetadata`?


 Guozhang

 On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax
 > wrote:

 Thanks for the progress on this KIP. I think we are on the
 right path!

 Couple of comments/questions:

 (1) Why do we not consider the "rejected alternative" to add
 the method
 to KafkaStreams? The comment on #streamThreads() says:

 "Note this method will return null if called on
 {@link
 StreamsMetadata} which represent a remote application."

 Thus, if we cannot get any remote metadata, it seems not
 straight
 forward to not add it to KafkaStreams directly -- this would
 avoid
 invalid calls and `null` return value in the first place.

 I like the idea about exposing sub-topologies.:

 (2a) I would recommend to rename `topicsGroupId` to
 `subTopologyId` :)

 (2b) We could add this to KIP-120 already. However, I would
 not just
 link both via name, but leverage KIP-120 directly, and add a
 "Subtopology" member to the TaskMetadata class.


 Overall, I like the distinction of KIP-120 only exposing
 "static"
 information that can be determined before the topology get's
 started,
 while this KIP allow to access runtime information.



 -Matthias


 On 3/22/17 12:42 PM, Guozhang Wang wrote:
 > Thanks for the updated KIP, and 

[jira] [Commented] (KAFKA-5069) add controller integration tests

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968497#comment-15968497
 ] 

ASF GitHub Bot commented on KAFKA-5069:
---

GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2853

KAFKA-5069: add controller integration tests

Test the various controller protocols by observing zookeeper and broker 
state.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-5069

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2853


commit 55544d2375fa267762bc5ecc233f7a296202922d
Author: Onur Karaman 
Date:   2017-04-14T01:54:43Z

KAFKA-5069: add controller integration tests

Test the various controller protocols by observing zookeeper and broker 
state.




> add controller integration tests
> 
>
> Key: KAFKA-5069
> URL: https://issues.apache.org/jira/browse/KAFKA-5069
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Test the various controller protocols by observing zookeeper and broker state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5069) add controller integration tests

2017-04-13 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5069 started by Onur Karaman.
---
> add controller integration tests
> 
>
> Key: KAFKA-5069
> URL: https://issues.apache.org/jira/browse/KAFKA-5069
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Test the various controller protocols by observing zookeeper and broker state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5005) JoinIntegrationTest.testLeftKStreamKStream() fails occasionally

2017-04-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5005:
---
Description: 
testLeftKStreamKStream:
{noformat}
java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 
records from topic outputTopic while only received 0: []
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250)
{noformat}

testInnerKStreamKTable:
{noformat}
java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 
records from topic outputTopic while only received 0: []
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:248)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:171)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:193)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.testInnerKStreamKTable(JoinIntegrationTest.java:305)
{noformat}

  was:
{noformat}
java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 
records from topic outputTopic while only received 0: []
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192)
at 
org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250)
{noformat}


> JoinIntegrationTest.testLeftKStreamKStream() fails occasionally
> ---
>
> Key: KAFKA-5005
> URL: https://issues.apache.org/jira/browse/KAFKA-5005
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Eno Thereska
>
> testLeftKStreamKStream:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 
> records from topic outputTopic while only received 0: []
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247)
> at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170)
> at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192)
> at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250)
> {noformat}
> testInnerKStreamKTable:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 
> records from topic outputTopic while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:248)
>   at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:193)
>   at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.testInnerKStreamKTable(JoinIntegrationTest.java:305)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5005) JoinIntegrationTest fails occasionally

2017-04-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5005:
---
Summary: JoinIntegrationTest fails occasionally  (was: 
JoinIntegrationTest.testLeftKStreamKStream() fails occasionally)

> JoinIntegrationTest fails occasionally
> --
>
> Key: KAFKA-5005
> URL: https://issues.apache.org/jira/browse/KAFKA-5005
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Eno Thereska
>
> testLeftKStreamKStream:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 
> records from topic outputTopic while only received 0: []
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:247)
> at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:170)
> at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:192)
> at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream(JoinIntegrationTest.java:250)
> {noformat}
> testInnerKStreamKTable:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 1 
> records from topic outputTopic while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:265)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:248)
>   at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:193)
>   at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.testInnerKStreamKTable(JoinIntegrationTest.java:305)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5057) "Big Message Log"

2017-04-13 Thread Umesh Chaudhary (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968580#comment-15968580
 ] 

Umesh Chaudhary commented on KAFKA-5057:


Understood and yes this is a good idea to capture the frequency of "Big 
Messages" on broker. That new broker config would set the threshold and the 
produced messages which exceed that threshold, broker would log their details. 
Also, I can start preparing KIP for this feature. 

> "Big Message Log"
> -
>
> Key: KAFKA-5057
> URL: https://issues.apache.org/jira/browse/KAFKA-5057
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>
> Really large requests can cause significant GC pauses which can cause quite a 
> few other symptoms on a broker. Will be nice to be able to catch them.
> Lets add the option to log details (client id, topic, partition) for every 
> produce request that is larger than a configurable threshold.
> /cc [~apurva]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5065) AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967548#comment-15967548
 ] 

ASF GitHub Bot commented on KAFKA-5065:
---

GitHub user porshkevich opened a pull request:

https://github.com/apache/kafka/pull/2850

KAFKA-5065; AbstractCoordinator.ensureCoordinatorReady() stuck in loop if 
absent any bootstrap servers

add a consumer config: "max.block.ms"
default to 6 ms;
when specified, the ensureCoordinatorReady check default call will be 
limited by "max.block.ms"

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/porshkevich/kafka KAFKA-5065

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2850.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2850


commit 99004de30a5400b2d8554b4a4469039498e033d4
Author: Vladimir Porshkevich 
Date:   2017-04-13T12:41:31Z

Add max.block.ms to allow timing out ensureCoordinatorReady check.




> AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any 
> bootstrap servers 
> ---
>
> Key: KAFKA-5065
> URL: https://issues.apache.org/jira/browse/KAFKA-5065
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Vladimir Porshkevich
>  Labels: newbie
>   Original Estimate: 4m
>  Remaining Estimate: 4m
>
> If Consumer started with wrong bootstrap servers or absent any valid servers, 
> and Thread call Consumer.poll(timeout) with any timeout Thread stuck in loop 
> with debug logs like
> {noformat}
> org.apache.kafka.common.network.Selector - Connection with /172.31.1.100 
> disconnected
> java.net.ConnectException: Connection timed out: no further information
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
>   at 
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> com.example.SccSpringCloudDemoApplication.main(SccSpringCloudDemoApplication.java:46)
> {noformat}
> Problem with AbstractCoordinator.ensureCoordinatorReady() method
> It uses Long.MAX_VALUE as timeout.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967479#comment-15967479
 ] 

Ismael Juma commented on KAFKA-5062:


Kafka version was 0.10.1.1, so I don't think BlockingChannel was used. Yes, it 
was an application that had been misconfigured to connect to Kafka.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5065) AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers

2017-04-13 Thread Vladimir Porshkevich (JIRA)
Vladimir Porshkevich created KAFKA-5065:
---

 Summary: AbstractCoordinator.ensureCoordinatorReady() stuck in 
loop if absent any bootstrap servers 
 Key: KAFKA-5065
 URL: https://issues.apache.org/jira/browse/KAFKA-5065
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0, 0.10.0.1, 0.10.0.0
Reporter: Vladimir Porshkevich


If Consumer started with wrong bootstrap servers or absent any valid servers, 
and Thread call Consumer.poll(timeout) with any timeout Thread stuck in loop 
with debug logs like

{noformat}
org.apache.kafka.common.network.Selector - Connection with /172.31.1.100 
disconnected
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
at 
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
com.example.SccSpringCloudDemoApplication.main(SccSpringCloudDemoApplication.java:46)
{noformat}

Problem with AbstractCoordinator.ensureCoordinatorReady() method
It uses Long.MAX_VALUE as timeout.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2850: KAFKA-5065; AbstractCoordinator.ensureCoordinatorR...

2017-04-13 Thread porshkevich
GitHub user porshkevich opened a pull request:

https://github.com/apache/kafka/pull/2850

KAFKA-5065; AbstractCoordinator.ensureCoordinatorReady() stuck in loop if 
absent any bootstrap servers

add a consumer config: "max.block.ms"
default to 6 ms;
when specified, the ensureCoordinatorReady check default call will be 
limited by "max.block.ms"

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/porshkevich/kafka KAFKA-5065

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2850.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2850


commit 99004de30a5400b2d8554b4a4469039498e033d4
Author: Vladimir Porshkevich 
Date:   2017-04-13T12:41:31Z

Add max.block.ms to allow timing out ensureCoordinatorReady check.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967450#comment-15967450
 ] 

Rajini Sivaram commented on KAFKA-5062:
---

[~ijuma] Was it an application writing bad data - not a test injecting bad data 
on the network? Controller connections use `BlockingChannel` with unlimited 
receive size. But as you say, can't see how a badly behaved application could 
trigger that size allocation.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2237: support scala 2.12 build

2017-04-13 Thread pjfanning
Github user pjfanning closed the pull request at:

https://github.com/apache/kafka/pull/2237


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-5062) Kafka brokers can accept malformed requests which allocate gigabytes of memory

2017-04-13 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967450#comment-15967450
 ] 

Rajini Sivaram edited comment on KAFKA-5062 at 4/13/17 11:37 AM:
-

[~ijuma] Was it an application writing bad data - not a test injecting bad data 
on the network? Controller connections use `BlockingChannel` with unlimited 
receive size with older versions of the broker. But as you say, can't see how a 
badly behaved application could trigger that size allocation.


was (Author: rsivaram):
[~ijuma] Was it an application writing bad data - not a test injecting bad data 
on the network? Controller connections use `BlockingChannel` with unlimited 
receive size. But as you say, can't see how a badly behaved application could 
trigger that size allocation.

> Kafka brokers can accept malformed requests which allocate gigabytes of memory
> --
>
> Key: KAFKA-5062
> URL: https://issues.apache.org/jira/browse/KAFKA-5062
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>
> In some circumstances, it is possible to cause a Kafka broker to allocate 
> massive amounts of memory by writing malformed bytes to the brokers port. 
> In investigating an issue, we saw byte arrays on the kafka heap upto 1.8 
> gigabytes, the first 360 bytes of which were non kafka requests -- an 
> application was writing the wrong data to kafka, causing the broker to 
> interpret the request size as 1.8GB and then allocate that amount. Apart from 
> the first 360 bytes, the rest of the 1.8GB byte array was null. 
> We have a socket.request.max.bytes set at 100MB to protect against this kind 
> of thing, but somehow that limit is not always respected. We need to 
> investigate why and fix it.
> cc [~rnpridgeon], [~ijuma], [~gwenshap], [~cmccabe]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2017-04-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967705#comment-15967705
 ] 

Jun Rao commented on KAFKA-2729:


Thanks for the additional info. In both [~Ronghua Lin] and [~allenzhuyi]'s 
case, it seems ZK session expiration had happened. As I mentioned earlier in 
the jira, there is a known issue reported in KAFKA-3083 that when the 
controller's ZK session expires and loses its controller-ship, it's possible 
for this zombie controller to continue updating ZK and/or sending 
LeaderAndIsrRequests to the brokers for a short period of time. When this 
happens, the broker may not have the most up-to-date information about leader 
and isr, which can lead to subsequent ZK failure when isr needs to be updated.

It may take some time to have this issue fixed. In the interim, the workaround 
for this issue is to make sure ZK session expiration never happens. This first 
thing is to figure out what's causing the ZK session to expire. Two common 
causes are (1) long broker GC and (2) network glitches. For (1), one needs to 
tune the GC in the broker properly. For (2), one can look at the reported time 
that the ZK client can't hear from the ZK server and increase the ZK session 
expiration time according.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2017-04-13 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967735#comment-15967735
 ] 

Edoardo Comar commented on KAFKA-2729:
--

FWIW - we saw the same message 
{{  Cached zkVersion [66] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition) }}

when redeploying kafka 0.10.0.1 in a cluster after we had run 0.10.2.0
after having wiped kafka's storage, but having kept zookeeper's version (the 
one bundled with kafka 0.10.2) and its storage

For us eventually the cluster recovered.
HTH.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5067) java.sql.SQLDataException on TimeStamp column when using AWS Redshift as a JDBC source

2017-04-13 Thread Curtis Wilde (JIRA)
Curtis Wilde created KAFKA-5067:
---

 Summary: java.sql.SQLDataException on TimeStamp column when using 
AWS Redshift as a JDBC source
 Key: KAFKA-5067
 URL: https://issues.apache.org/jira/browse/KAFKA-5067
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Curtis Wilde
Priority: Minor


Kafka Connect throws java.sql.SQLDataException when attempting to use Redshift 
as a data source.

When I run the query "select CURRENT_TIMESTAMP;" in a SQL editor it returns:
2017-04-13 16:11:25.204925+00

Full stack trace:

[2017-04-13 09:44:09,910] ERROR Failed to get current time from DB using query 
select CURRENT_TIMESTAMP; on database PostgreSQL 
(io.confluent.connect.jdbc.util.JdbcUtils:205)
java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to 
Timestamp.
at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown 
Source)
at 
com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source)
at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown 
Source)
at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source)
at 
io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169)
at 
io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at 
io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-04-13 09:44:09,912] ERROR Failed to run query for table 
TimestampIncrementingTableQuerier{name='null', query='', 
topicPrefix='', timestampColumn='', 
incrementingColumn='null'}: {} 
(io.confluent.connect.jdbc.source.JdbcSourceTask:221)
java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to 
Timestamp.
at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown 
Source)
at 
com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source)
at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown 
Source)
at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source)
at 
io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169)
at 
io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at 
io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
^C[2017-04-13 09:44:12,236] INFO Kafka Connect stopping 
(org.apache.kafka.connect.runtime.Connect:66)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5067) java.sql.SQLDataException on TimeStamp column when using AWS Redshift as a JDBC source

2017-04-13 Thread Curtis Wilde (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Curtis Wilde updated KAFKA-5067:

Description: 
Kafka Connect throws java.sql.SQLDataException when attempting to use Redshift 
as a data source.

When I run the query "select CURRENT_TIMESTAMP;" in a SQL editor it returns:
2017-04-13 16:11:25.204925+00

Full stack trace:

[2017-04-13 09:44:09,910] ERROR Failed to get current time from DB using query 
select CURRENT_TIMESTAMP; on database PostgreSQL 
(io.confluent.connect.jdbc.util.JdbcUtils:205)
java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to 
Timestamp.
at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown 
Source)
at 
com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source)
at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown 
Source)
at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source)
at 
io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169)
at 
io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at 
io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-04-13 09:44:09,912] ERROR Failed to run query for table 
TimestampIncrementingTableQuerier{name='null', query='', 
topicPrefix='', timestampColumn='', 
incrementingColumn='null'}: {} 
(io.confluent.connect.jdbc.source.JdbcSourceTask:221)
java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to 
Timestamp.
at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown 
Source)
at 
com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source)
at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown 
Source)
at com.amazon.jdbc.common.SForwardResultSet.getTimestamp(Unknown Source)
at 
io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:201)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:169)
at 
io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at 
io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-04-13 09:44:12,236] INFO Kafka Connect stopping 
(org.apache.kafka.connect.runtime.Connect:66)

  was:
Kafka Connect throws java.sql.SQLDataException when attempting to use Redshift 
as a data source.

When I run the query "select CURRENT_TIMESTAMP;" in a SQL editor it returns:
2017-04-13 16:11:25.204925+00

Full stack trace:

[2017-04-13 09:44:09,910] ERROR Failed to get current time from DB using query 
select CURRENT_TIMESTAMP; on database PostgreSQL 
(io.confluent.connect.jdbc.util.JdbcUtils:205)
java.sql.SQLDataException: [Amazon][JDBC](10140) Error converting value to 
Timestamp.
at com.amazon.exceptions.ExceptionConverter.toSQLException(Unknown 
Source)
at 
com.amazon.utilities.conversion.TypeConverter.convertToTimestamp(Unknown Source)
at com.amazon.utilities.conversion.TypeConverter.toTimestamp(Unknown 
Source)
at 

[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2017-04-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967817#comment-15967817
 ] 

Matthias J. Sax commented on KAFKA-5037:


A similar issue got reported here: 
http://search-hadoop.com/m/Kafka/uyzND14MkzKYvKSh2?subj=Kafka+Streams+Application+does+not+start+after+10+1+to+10+2+update+if+topics+need+to+be+auto+created

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.11.0.0
>
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5057) "Big Message Log"

2017-04-13 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967819#comment-15967819
 ] 

Gwen Shapira commented on KAFKA-5057:
-

I thought of a new configuration (which made me realize this will require a 
KIP). 

Basically, sometimes users bump up max.request.size, but only expect very few 
large messages (since large messages have impact on garbage collection, 
throughput, etc). Such log will let them track the number of large messages, 
their size and their source so they can see if their expectation is correct and 
adjust course if it isn't. So I will set max.request.size to 10MB, but the 
logging threshold to 1MB, because I expect very few messages between 1MB and 
10MB.

Does that make sense?

> "Big Message Log"
> -
>
> Key: KAFKA-5057
> URL: https://issues.apache.org/jira/browse/KAFKA-5057
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>
> Really large requests can cause significant GC pauses which can cause quite a 
> few other symptoms on a broker. Will be nice to be able to catch them.
> Lets add the option to log details (client id, topic, partition) for every 
> produce request that is larger than a configurable threshold.
> /cc [~apurva]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-5001) RocksDBSessionStoreSupplierTest#shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled

2017-04-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5001.

Resolution: Duplicate

> RocksDBSessionStoreSupplierTest#shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled
> ---
>
> Key: KAFKA-5001
> URL: https://issues.apache.org/jira/browse/KAFKA-5001
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>
> Test fails with
> {noformat}
> org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplierTest > 
> shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled STARTED
> pure virtual method called
> terminate called without an active exception
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5068) Optionally print out metrics after running the perf tests

2017-04-13 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-5068:
--

 Summary: Optionally print out metrics after running the perf tests
 Key: KAFKA-5068
 URL: https://issues.apache.org/jira/browse/KAFKA-5068
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Affects Versions: 0.10.2.0
Reporter: Jun Rao


Often, we run ProducerPerformance/ConsumerPerformance tests to investigate 
performance issues. It's useful for the tool to print out the metrics in the 
producer/consumer at the end of the tests. We can make this optional to 
preserve the current behavior by default.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)